From a4df016461a6ff2ceebfffe2d9e7da19ee9dcf9b Mon Sep 17 00:00:00 2001 From: Abzal Tuganbay Date: Tue, 27 Jun 2023 03:31:24 +0600 Subject: [PATCH] [Tour of Beam] add work example (#27080) * add work example * correct * correct tags for bigquery examples * correct read-query * correct read-query tag * correct imports * remove package * correct * fixed example name --------- Co-authored-by: mende1esmende1es Co-authored-by: Oleh Borysevych --- .../read-query/go-example/main.go | 74 ++++++++++--------- .../read-query/java-example/Task.java | 64 ++++++++-------- .../read-query/python-example/task.py | 52 +++++++------ .../read-table/go-example/main.go | 3 +- .../read-table/java-example/Task.java | 6 +- .../read-table/python-example/task.py | 9 +-- 6 files changed, 107 insertions(+), 101 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go index fec979ad7eda0..49ab6057bac25 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go @@ -18,9 +18,11 @@ // beam-playground: // name: read-query -// description: BigQuery read query example. +// description: BigQueryIO read query example. // multifile: false -// context_line: 40 +// context_line: 42 +// never_run: true +// always_run: true // categories: // - Quickstart // complexity: ADVANCED @@ -29,47 +31,49 @@ package main import ( - _ "context" - _ "flag" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + + "cloud.google.com/go/bigquery" internal_log "log" - _ "reflect" + "reflect" ) -// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments. -//The bigquery tag in the struct field is used to map the struct field to the BigQuery column. -type CommentRow struct { - Text string `bigquery:"text"` +type Game struct { + GameID bigquery.NullString `bigquery:"gameId"` + GameNumber bigquery.NullInt64 `bigquery:"gameNumber"` + SeasonID bigquery.NullString `bigquery:"seasonId"` + Year bigquery.NullInt64 `bigquery:"year"` + Type bigquery.NullString `bigquery:"type"` + DayNight bigquery.NullString `bigquery:"dayNight"` + Duration bigquery.NullString `bigquery:"duration"` } -// Construct the BigQuery query: A constant query is defined that selects the text column -// from the bigquery-public-data.hacker_news.comments table for a certain time range. -const query = `SELECT text -FROM ` + "`bigquery-public-data.hacker_news.comments`" + ` -WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' -LIMIT 1000 -` - func main() { internal_log.Println("Running Task") - /* - ctx := context.Background() - p := beam.NewPipeline() - s := p.Root() - project := "tess-372508" - // Build a PCollection by querying BigQuery. - rows := bigqueryio.Query(s, project, query, - reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL()) + ctx := context.Background() + p := beam.NewPipeline() + s := p.Root() + project := "apache-beam-testing" - debug.Print(s, rows) + // Build a PCollection by querying BigQuery. + rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`", + reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL()) - // Now that the pipeline is fully constructed, we execute it. - if err := beamx.Run(ctx, p); err != nil { - log.Exitf(ctx, "Failed to execute job: %v", err) - }*/ + fixedSizeLines := top.Largest(s, rows, 5, less) + + debug.Print(s, fixedSizeLines) + // Now that the pipeline is fully constructed, we execute it. + if err := beamx.Run(ctx, p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } +} +func less(a, b Game) bool { + return true } diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java index 256c70919ce77..12c1fbcd9b48f 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java @@ -27,11 +27,10 @@ // tags: // - hellobeam +import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; -import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -40,52 +39,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Task { +public class Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - public static void main(String[] args) { - LOG.info("Running Task"); - System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - options.setTempLocation("gs://bucket"); - options.as(BigQueryOptions.class).setProject("project-id"); + private static final String WEATHER_SAMPLES_QUERY = + "select * from `clouddataflow-readonly.samples.weather_stations`"; - Pipeline pipeline = Pipeline.create(options); + public static void applyBigQueryTornadoes(Pipeline p) { + /*TypedRead bigqueryIO = + BigQueryIO.readTableRows() + .fromQuery(WEATHER_SAMPLES_QUERY) + .usingStandardSql(); - // pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection. - // The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record. - // The .fromQuery("SELECT field FROM project-id.dataset.table") - // specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively. -/* - PCollection pCollection = pipeline - .apply(BigQueryIO.read( - (SchemaAndRecord elem) -> (Double) elem.getRecord().get("field")) - .fromQuery( - "SELECT field FROM `project-id.dataset.table`") - .usingStandardSql() - .withCoder(DoubleCoder.of())); - pCollection - .apply("Log words", ParDo.of(new LogOutput<>())); -*/ - pipeline.run(); + PCollection rowsFromBigQuery = p.apply(bigqueryIO); + + rowsFromBigQuery + .apply(ParDo.of(new LogOutput<>("Result: ")));*/ + } + + public static void runBigQueryTornadoes(PipelineOptions options) { + Pipeline p = Pipeline.create(options); + applyBigQueryTornadoes(p); + p.run().waitUntilFinish(); + } + + public static void main(String[] args) { + PipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); + runBigQueryTornadoes(options); } static class LogOutput extends DoFn { private final String prefix; - LogOutput() { - this.prefix = "Processing element"; - } - LogOutput(String prefix) { this.prefix = prefix; } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info(prefix + ": {}", c.element()); + public void processElement(ProcessContext c) { + LOG.info(prefix + c.element()); + c.output(c.element()); } } -} \ No newline at end of file +} diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py index 6204635c630f3..fbb1e1e302d19 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py @@ -16,8 +16,10 @@ # beam-playground: # name: read-query -# description: TextIO read query example. +# description: BigQueryIO read query example. # multifile: false +# never_run: true +# always_run: true # context_line: 34 # categories: # - Quickstart @@ -26,39 +28,43 @@ # - hellobeam import argparse +import os +import warnings + import apache_beam as beam -from apache_beam.io import ReadFromText -from apache_beam.io import WriteToBigQuery -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions +from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery + +class WeatherData: + def __init__(self, station_number, wban_number, year, month, day): + self.station_number = station_number + self.wban_number = wban_number + self.year = year + self.month = month + self.day = day + def __str__(self): + return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}" def run(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--input', - dest='input', - default='gs://bucket', - help='Input file to process.') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True + pipeline_options.view_as(PipelineOptions) - """ - (p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the - pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery( - query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >> - beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the - PCollection and logs the output to the console. It prints the 'field' column from each row in the table. - """ - with beam.Pipeline(options=pipeline_options) as p: - (p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`'))) - # Each row is a dictionary where the keys are the BigQuery columns - #| beam.Map(lambda elem: elem['field']) - ) + with beam.Pipeline(options=pipeline_options, argv=argv) as p: + (p + # | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True, + # method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) + # | beam.combiners.Sample.FixedSizeGlobally(5) + # | beam.FlatMap(lambda line: line) + # | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day'])) + # | beam.Map(print) + ) if __name__ == '__main__': - run() + run() diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index 1751beb191e77..ef2462f4de362 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -28,6 +28,7 @@ // complexity: ADVANCED // tags: // - hellobeam + package main import ( @@ -62,7 +63,7 @@ func main() { s := p.Root() project := "apache-beam-testing" - // Build a PCollection by querying BigQuery. + // Build a PCollection by querying BigQuery. rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) fixedSizeLines := top.Largest(s, rows, 5, less) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java index 835954382a9d9..206a0c0b8ee03 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -35,7 +35,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sample; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index d57d9a145b874..e89779e5a26bd 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -16,7 +16,7 @@ # beam-playground: # name: read-table -# description: TextIO read table example. +# description: BigQueryIO read table example. # multifile: false # never_run: true # always_run: true @@ -28,13 +28,8 @@ # - hellobeam import argparse -import os -import warnings - import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions -#from google.cloud import bigquery -from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery class WeatherData: def __init__(self, station_number, wban_number, year, month, day): @@ -57,7 +52,7 @@ def run(argv=None): with beam.Pipeline(options=pipeline_options, argv=argv) as p: - (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', + (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) | beam.combiners.Sample.FixedSizeGlobally(5) | beam.FlatMap(lambda line: line)