Skip to content

Commit

Permalink
[Tour of Beam] add work example (apache#27080)
Browse files Browse the repository at this point in the history
* add work example

* correct

* correct tags for bigquery examples

* correct read-query

* correct read-query tag

* correct imports

* remove package

* correct

* fixed example name

---------

Co-authored-by: mende1esmende1es <mende1esmende1es@gmail.cp>
Co-authored-by: Oleh Borysevych <oleg.borisevich@akvelon.com>
  • Loading branch information
3 people authored and cushon committed May 24, 2024
1 parent 79d628e commit a4df016
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

// beam-playground:
// name: read-query
// description: BigQuery read query example.
// description: BigQueryIO read query example.
// multifile: false
// context_line: 40
// context_line: 42
// never_run: true
// always_run: true
// categories:
// - Quickstart
// complexity: ADVANCED
Expand All @@ -29,47 +31,49 @@
package main

import (
_ "context"
_ "flag"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"

"cloud.google.com/go/bigquery"
internal_log "log"
_ "reflect"
"reflect"
)

// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments.
//The bigquery tag in the struct field is used to map the struct field to the BigQuery column.
type CommentRow struct {
Text string `bigquery:"text"`
type Game struct {
GameID bigquery.NullString `bigquery:"gameId"`
GameNumber bigquery.NullInt64 `bigquery:"gameNumber"`
SeasonID bigquery.NullString `bigquery:"seasonId"`
Year bigquery.NullInt64 `bigquery:"year"`
Type bigquery.NullString `bigquery:"type"`
DayNight bigquery.NullString `bigquery:"dayNight"`
Duration bigquery.NullString `bigquery:"duration"`
}

// Construct the BigQuery query: A constant query is defined that selects the text column
// from the bigquery-public-data.hacker_news.comments table for a certain time range.
const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
LIMIT 1000
`

func main() {
internal_log.Println("Running Task")
/*
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := "tess-372508"

// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, query,
reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := "apache-beam-testing"

debug.Print(s, rows)
// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`",
reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL())

// Now that the pipeline is fully constructed, we execute it.
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}*/
fixedSizeLines := top.Largest(s, rows, 5, less)

debug.Print(s, fixedSizeLines)
// Now that the pipeline is fully constructed, we execute it.
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
func less(a, b Game) bool {
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
// tags:
// - hellobeam

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -40,52 +39,49 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Task {

public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);

public static void main(String[] args) {
LOG.info("Running Task");
System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setTempLocation("gs://bucket");
options.as(BigQueryOptions.class).setProject("project-id");
private static final String WEATHER_SAMPLES_QUERY =
"select * from `clouddataflow-readonly.samples.weather_stations`";

Pipeline pipeline = Pipeline.create(options);
public static void applyBigQueryTornadoes(Pipeline p) {
/*TypedRead<TableRow> bigqueryIO =
BigQueryIO.readTableRows()
.fromQuery(WEATHER_SAMPLES_QUERY)
.usingStandardSql();
// pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection.
// The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record.
// The .fromQuery("SELECT field FROM project-id.dataset.table")
// specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively.
/*
PCollection<Double> pCollection = pipeline
.apply(BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("field"))
.fromQuery(
"SELECT field FROM `project-id.dataset.table`")
.usingStandardSql()
.withCoder(DoubleCoder.of()));
pCollection
.apply("Log words", ParDo.of(new LogOutput<>()));
*/
pipeline.run();
PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);
rowsFromBigQuery
.apply(ParDo.of(new LogOutput<>("Result: ")));*/
}

public static void runBigQueryTornadoes(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
applyBigQueryTornadoes(p);
p.run().waitUntilFinish();
}

public static void main(String[] args) {
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
runBigQueryTornadoes(options);
}

static class LogOutput<T> extends DoFn<T, T> {
private final String prefix;

LogOutput() {
this.prefix = "Processing element";
}

LogOutput(String prefix) {
this.prefix = prefix;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + ": {}", c.element());
public void processElement(ProcessContext c) {
LOG.info(prefix + c.element());
c.output(c.element());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

# beam-playground:
# name: read-query
# description: TextIO read query example.
# description: BigQueryIO read query example.
# multifile: false
# never_run: true
# always_run: true
# context_line: 34
# categories:
# - Quickstart
Expand All @@ -26,39 +28,43 @@
# - hellobeam

import argparse
import os
import warnings

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery

class WeatherData:
def __init__(self, station_number, wban_number, year, month, day):
self.station_number = station_number
self.wban_number = wban_number
self.year = year
self.month = month
self.day = day

def __str__(self):
return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}"

def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://bucket',
help='Input file to process.')

known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(PipelineOptions)

"""
(p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the
pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery(
query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >>
beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the
PCollection and logs the output to the console. It prints the 'field' column from each row in the table.
"""

with beam.Pipeline(options=pipeline_options) as p:
(p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`')))
# Each row is a dictionary where the keys are the BigQuery columns
#| beam.Map(lambda elem: elem['field'])
)
with beam.Pipeline(options=pipeline_options, argv=argv) as p:
(p
# | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True,
# method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
# | beam.combiners.Sample.FixedSizeGlobally(5)
# | beam.FlatMap(lambda line: line)
# | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day']))
# | beam.Map(print)
)


if __name__ == '__main__':
run()
run()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
// complexity: ADVANCED
// tags:
// - hellobeam

package main

import (
Expand Down Expand Up @@ -62,7 +63,7 @@ func main() {
s := p.Root()
project := "apache-beam-testing"

// Build a PCollection<CommentRow> by querying BigQuery.
// Build a PCollection<Game> by querying BigQuery.
rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{}))

fixedSizeLines := top.Largest(s, rows, 5, less)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# beam-playground:
# name: read-table
# description: TextIO read table example.
# description: BigQueryIO read table example.
# multifile: false
# never_run: true
# always_run: true
Expand All @@ -28,13 +28,8 @@
# - hellobeam

import argparse
import os
import warnings

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
#from google.cloud import bigquery
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery

class WeatherData:
def __init__(self, station_number, wban_number, year, month, day):
Expand All @@ -57,7 +52,7 @@ def run(argv=None):


with beam.Pipeline(options=pipeline_options, argv=argv) as p:
(p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
(p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
| beam.combiners.Sample.FixedSizeGlobally(5)
| beam.FlatMap(lambda line: line)
Expand Down

0 comments on commit a4df016

Please sign in to comment.