Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tour of Beam] Learning content for "Introduction" module #23085

Merged
merged 34 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6a581d0
learning content for introduction module
Sep 8, 2022
4cabcb7
removing white spaces from md files
Sep 8, 2022
0aa2e28
delete whitespaces
sirenbyte Sep 8, 2022
52ab6a9
Merge remote-tracking branch 'origin/tob-introduction-module' into to…
sirenbyte Sep 8, 2022
e081553
delete whitespaces in python
sirenbyte Sep 8, 2022
2d40ee7
delete whitespace #2
sirenbyte Sep 8, 2022
7a07f51
divide pipeline concepts
sirenbyte Sep 9, 2022
4fd5735
add pipeline example concepts
sirenbyte Sep 9, 2022
7ad04d6
adding category tag to python examples
Sep 9, 2022
f7eb966
adding category to java examples
Sep 9, 2022
9204431
adding category to go examples
Sep 9, 2022
af66a8a
fixed go example
Sep 9, 2022
5322fe1
fixed go example compilation
Sep 9, 2022
11e2cf8
fixing python duplicate example names
Sep 9, 2022
dde4bf4
add runner concepts
sirenbyte Sep 9, 2022
b270bbd
Merge remote-tracking branch 'origin/tob-introduction-module' into to…
sirenbyte Sep 9, 2022
12fe2f3
fixing java examples
Sep 9, 2022
f1478c8
Merge branch 'tob-introduction-module' of https://github.com/akvelon/…
Sep 9, 2022
89f74ad
add licence for runner unit
sirenbyte Sep 12, 2022
53d6cec
some minor fixes for unit names
Sep 13, 2022
0cef6f5
fixed unit name
Sep 13, 2022
0e411cf
resolving CR comments
Sep 14, 2022
a94cc39
adding complexity to examples
Sep 27, 2022
5554db4
adding tags
Sep 27, 2022
6e1f374
fixed go example compilation
Sep 28, 2022
5ff346a
fixed python example with duplicate transform
Sep 28, 2022
4478c7a
change indent python
sirenbyte Sep 28, 2022
6debbed
fixing missing pipeline options
Sep 28, 2022
7e929cf
change arrow symbol
sirenbyte Oct 17, 2022
934b205
delete example prefix
sirenbyte Oct 24, 2022
f8e9e03
minor formatting and readability fixes
Oct 25, 2022
7444a90
add example description
sirenbyte Oct 28, 2022
33897ca
minor fix
sirenbyte Oct 31, 2022
b4af54d
minor code review comment
Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions learning/tour-of-beam/learning-content/go/content-info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sdk: Go
content:
- introduction
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Creating PCollection

Now that you know how to create a Beam pipeline and pass parameters into it, it is time to learn how to create an initial `PCollection` and fill it with data.

There are several options:

→ You can create a PCollection of data stored in an in-memory collection class in your driver program.

→ You can also read the data from a variety of external sources such as local or cloud-based files, databases, or other sources using Beam-provided I/O adapters

Through the tour, most of the examples use either a `PCollection` created from in-memory data or data read from one of the cloud buckets "beam-examples" or "dataflow-samples". These buckets contain sample data sets specifically created for educational purposes.

We encourage you to take a look, explore these data sets and use them while learning Apache Beam.

### Creating a PCollection from in-memory data

You can use the Beam-provided Create transform to create a `PCollection` from an in-memory Go Collection. You can apply Create transform directly to your Pipeline object itself.

The following example code shows how to do this:

```
func main() {
ctx := context.Background()

// First create pipeline
p, s := beam.NewPipelineWithRoot()

//Now create the PCollection using list of strings
strings := beam.Create(s, "To", "be", "or", "not", "to", "be","that", "is", "the", "question")

//Create a numerical PCollection
numbers := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

}
```

### Playground exercise

You can find the complete code of this example in the playground window you can run and experiment with.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the complete code of this example in the playground window where you can run the pipeline and experiment with it.


One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Don’t worry if you don’t quite understand it, as the concept of `ParDo` transform will be explained later in the course. Feel free, however, to use it in exercises and challenges to explore results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference you will notice is that it also contains a function to output PCollection elements to the console. Don’t worry if you don’t quite understand it, as the concept of ParDo transforms will be explained later in the course. Feel free, however, to use it in exercises and challenges to explore results.


Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also notice the order elements of the PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// beam-playground:
// name: ParDo
// description: ParDo example.
// multifile: false
// context_line: 32
// categories:
// - Quickstart
// complexity: BASIC
// tags:
// - hellobeam

package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"fmt"
)

func main() {
p, s := beam.NewPipelineWithRoot()

words := beam.Create(s, "Hello", "world", "it`s", "Beam")

output(s, words)

err := beamx.Run(context.Background(), p)
if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
}

func output(s beam.Scope, input beam.PCollection) {
beam.ParDo0(s, func(element interface{}) {
fmt.Println(element)
}, input)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: from-memory
name: Creating in-memory PCollections
taskName: ParDo
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: creating-collections
name: Creating Collections
content:
- from-memory
- reading-from-text
- reading-from-csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

### Read from csv file

Data processing pipelines often work with tabular data. In many examples and challenges throughout the course, you’ll be working with one of the datasets stored as csv files in either beam-examples, dataflow-samples buckets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as csv files in the "beam-examples" or "dataflow-samples" buckets.


Loading data from csv file requires some processing and consists of two main part:
* Loading text lines using `TextIO.Read` transform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading data from csv file takes two steps:

  • Load text lines using the TextIO.Read transform

* Parsing lines of text into tabular format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parse


### Playground exercise

Try to experiment with an example in the playground window and modify the code to process other fields from New York taxi rides dataset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to experiment with an example in the playground window and modify the code to process other fields from the New York taxi rides dataset.


Here is a small list of fields and an example record from this dataset:

| cost | passenger_count | ... |
|------|-----------------|-----|
| 5.8 | 1 | ... |
| 4.6 | 2 | ... |
| 24 | 1 | ... |

Overview [file](https://storage.googleapis.com/apache-beam-samples/nyc_taxi/misc/sample1000.csv)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// beam-playground:
// name: CSV
// description: CSV example.
// multifile: false
// context_line: 44
// categories:
// - Quickstart
// complexity: BASIC
// tags:
// - hellobeam

package main

import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"strconv"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"

)

func less(a, b float64) bool{
return a>b
}

func main() {
p, s := beam.NewPipelineWithRoot()

file := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv")

cost := applyTransform(s, file)

fixedSizeElements := top.Largest(s,cost,10,less)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining this


output(s, "Total cost: ", fixedSizeElements)

err := beamx.Run(context.Background(), p)
if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
}

// Read reads from fiename(s) specified by a glob string and a returns a PCollection<string>.
func Read(s beam.Scope, glob string) beam.PCollection {
return textio.Read(s, glob)
}

// ApplyTransform converts to uppercase all elements in a PCollection<string>.
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return beam.ParDo(s, func(line string) float64 {
taxi := strings.Split(strings.TrimSpace(line), ",")
if len(taxi) > 16 {
cost, _ := strconv.ParseFloat(taxi[16],64)
return cost
}
return 0.0
}, input)
}

func output(s beam.Scope, prefix string, input beam.PCollection) {
beam.ParDo0(s, func(elements []float64) {
for _, element := range elements {
fmt.Println(prefix,element)
}
}, input)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: from-csv
name: Creating PCollections from csv files
taskName: CSV
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Reading from text file

You use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a `PCollection` whose elements represent the data records in that source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a PCollection whose elements represent the data in that source.


Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself.

`TextIO.Read` , for example, reads from an external text file and returns a `PCollection` whose elements are of type String. Each String represents one line from the text file. Here’s how you would apply `TextIO.Read` to your Pipeline to create a `PCollection`:

```
func main() {
ctx := context.Background()

// First create pipline
p, s := beam.NewPipelineWithRoot()

// Now create the PCollection by reading text files. Separate elements will be added for each line in the input file
lines := textio.Read(scope, 'gs://some/inputData.txt')

}
```

### Playground exercise

In the playground window, you can find an example that reads a king lear poem from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the playground window, you can find an example that reads the Shakespeare play King Lear from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is.


One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output `PCollection` is limited with the `top.Largest(s,lines,10,less)` transform. Use Sample.fixedSizeGlobally transform of is another technique you can use to troubleshoot and limit the output sent to the console for debugging purposes in case of large input datasets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output PCollection is limited with the top.Largest(s,lines,10,less) transform. Another technique you can use to limit the output sent to the console for debugging purposes is the Sample.fixedSizeGlobally transform.


Overview [file](https://storage.googleapis.com/apache-beam-samples/shakespeare/kinglear.txt)
Loading