Skip to content

Commit

Permalink
Merge branch 'master' into mqtt-io-dynamic-write
Browse files Browse the repository at this point in the history
  • Loading branch information
twosom authored Sep 27, 2024
2 parents 1d91ec0 + c2c640f commit d7f3c26
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 400 deletions.
8 changes: 4 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,12 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))

## I/Os

* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465))

## New Features / Improvements

* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349))
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

Expand All @@ -78,6 +75,7 @@
as strings rather than silently coerced (and possibly truncated) to numeric
values. To retain the old behavior, pass `dtype=True` (or any other value
accepted by `pandas.read_json`).
* Users of KafkaIO Read transform that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) might encounter pipeline graph compatibility issues when updating the pipeline. To mitigate, set the `updateCompatibilityVersion` option to the SDK version used for the original pipeline, example `--updateCompatabilityVersion=2.58.1`

## Deprecations

Expand All @@ -87,6 +85,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
## Bugfixes

* (Java) Fixed custom delimiter issues in TextIO ([#32249](https://github.com/apache/beam/issues/32249), [#32251](https://github.com/apache/beam/issues/32251)).
* (Java, Python, Go) Fixed PeriodicSequence backlog bytes reporting, which was preventing Dataflow Runner autoscaling from functioning properly ([#32506](https://github.com/apache/beam/issues/32506)).
* (Java) Fix improper decoding of rows with schemas containing nullable fields when encoded with a schema with equal encoding positions but modified field order. ([#32388](https://github.com/apache/beam/issues/32388)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The Beam SDKs provide several abstractions that simplify the mechanics of large-

`PCollection`: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.

`PTransform`: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as the input, performs a processing function that you provide on the elements of that PCollection, and then produces zero or more output PCollection objects.
`PTransform`: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes zero or more PCollection objects as the input, performs a processing function that you provide on the elements of that PCollection, and then produces zero or more output PCollection objects.
{{if (eq .Sdk "go")}}

`Scope`: The Go SDK has an explicit scope variable used to build a `Pipeline`. A Pipeline can return it’s root scope with the `Root()` method. The scope variable is then passed to `PTransform` functions that place them in the `Pipeline` that owns the `Scope`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ limitations under the License.

Apache Beam provides a portable API layer for building sophisticated data-parallel processing `pipelines` that may be executed across a diversity of execution engines, or `runners`. The core concepts of this layer are based upon the Beam Model (formerly referred to as the Dataflow Model), and implemented to varying degrees in each Beam `runner`.

### Direct runner
### Direct Runner
The Direct Runner executes pipelines on your machine and is designed to validate that pipelines adhere to the Apache Beam model as closely as possible. Instead of focusing on efficient pipeline execution, the Direct Runner performs additional checks to ensure that users do not rely on semantics that are not guaranteed by the model. Some of these checks include:

* enforcing immutability of elements
Expand Down Expand Up @@ -61,9 +61,9 @@ In java, you need to set runner to `args` when you start the program.
{{end}}

{{if (eq .Sdk "python")}}
In the Python SDK , the default is runner **DirectRunner**.
In the Python SDK , the **DirectRunner** is the default runner and is used if no runner is specified.

Additionally, you can read more about the Direct Runner [here](https://beam.apache.org/documentation/runners/direct/)
You can read more about the **DirectRunner** [here](https://beam.apache.org/documentation/runners/direct/)

#### Run example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,29 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Tour of Beam Programming Guide

Welcome to a Tour Of Beam, a learning guide you can use to familiarize yourself with the Apache Beam.
The tour is divided into a list of modules that contain learning units covering various Apache Beam features and principles.
You can access the full list of modules by clicking ‘<<’ button on the left . For each module, learning progress is displayed next to it.
Throughout the tour, you will find learning materials, examples, exercises and challenges for you to complete.
Learning units are accompanied by code examples that you can review in the upper right pane. You can edit the code, or just run the example by clicking the ‘Run’ button. Output is displayed in the lower right pane.
Each module also contains a challenge based on the material learned. Try to solve as many as you can, and if you need help, just click on the ‘Hint’ button or examine the correct solution by clicking the ‘Solution’ button.
Now let’s start the tour by learning some core Beam principles.
# Welcome to a Tour of Beam

The Tour of Beam is a learning guide you can use to familiarize yourself with **Apache Beam**.

The tour is divided into a list of modules that contain learning units covering various Apache Beam features and principles. You can access the full list of modules by clicking the ‘<<’ button on the left. For each module, learning progress is displayed next to it.

Throughout the tour, you will find:

- **Learning materials**
- **Examples**
- **Exercises**
- **Challenges** for you to complete

Learning units are accompanied by code examples that you can review in the upper right pane. You can:

- **Edit the code**
- **Run the example**

After running the example, the output will be displayed in the lower right pane.

Each module also contains a challenge based on the material learned. Try to solve as many as you can, and if you need help, just click on the:

- **Hint** button
- **Solution** button to examine the correct solution

Now, let’s start the tour by learning some core Beam principles!
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ private ExecuteWorkResult executeWork(
// If processing failed due to a thrown exception, close the executionState. Do not
// return/release the executionState back to computationState as that will lead to this
// executionState instance being reused.
LOG.info("Invalidating executor after work item {} failed with Exception:", key, t);
computationWorkExecutor.invalidate();

// Re-throw the exception, it will be caught and handled by workFailureProcessor downstream.
Expand Down
8 changes: 4 additions & 4 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module github.com/apache/beam/sdks/v2
go 1.21

require (
cloud.google.com/go/bigquery v1.62.0
cloud.google.com/go/bigquery v1.63.0
cloud.google.com/go/bigtable v1.31.0
cloud.google.com/go/datastore v1.19.0
cloud.google.com/go/profiler v0.4.1
Expand Down Expand Up @@ -185,9 +185,9 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
)
16 changes: 8 additions & 8 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ cloud.google.com/go/bigquery v1.47.0/go.mod h1:sA9XOgy0A8vQK9+MWhEQTY6Tix87M/Zur
cloud.google.com/go/bigquery v1.48.0/go.mod h1:QAwSz+ipNgfL5jxiaK7weyOhzdoAy1zFm0Nf1fysJac=
cloud.google.com/go/bigquery v1.49.0/go.mod h1:Sv8hMmTFFYBlt/ftw2uN6dFdQPzBlREY9yBh7Oy7/4Q=
cloud.google.com/go/bigquery v1.50.0/go.mod h1:YrleYEh2pSEbgTBZYMJ5SuSr0ML3ypjRB1zgf7pvQLU=
cloud.google.com/go/bigquery v1.62.0 h1:SYEA2f7fKqbSRRBHb7g0iHTtZvtPSPYdXfmqsjpsBwo=
cloud.google.com/go/bigquery v1.62.0/go.mod h1:5ee+ZkF1x/ntgCsFQJAQTM3QkAZOecfCmvxhkJsWRSA=
cloud.google.com/go/bigquery v1.63.0 h1:yQFuJXdDukmBkiUUpjX0i1CtHLFU62HqPs/VDvSzaZo=
cloud.google.com/go/bigquery v1.63.0/go.mod h1:TQto6OR4kw27bqjNTGkVk1Vo5PJlTgxvDJn6YEIZL/E=
cloud.google.com/go/bigtable v1.31.0 h1:/uVLxGVRbK4mxK/iO89VqXcL/zoTSmkltVfIDYVBluQ=
cloud.google.com/go/bigtable v1.31.0/go.mod h1:N/mwZO+4TSHOeyiE1JxO+sRPnW4bnR7WLn9AEaiJqew=
cloud.google.com/go/billing v1.4.0/go.mod h1:g9IdKBEFlItS8bTtlrZdVLWSSdSyFUZKXNS02zKMOZY=
Expand Down Expand Up @@ -1308,8 +1308,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1609,8 +1609,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -1619,8 +1619,8 @@ golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 h1:LLhsEBxRTBLuKlQxFBYUOU8xyFgXv6cOTp2HASDlsDk=
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,6 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -373,21 +372,6 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Loading

0 comments on commit d7f3c26

Please sign in to comment.