Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document yaml pipeline options #30490

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions website/www/site/content/en/documentation/sdks/yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ pipeline:
config:
topic: anotherPubSubTopic
format: json
options:
streaming: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be implicit due to the PubSub source. I don't know that we want to recommend setting it everywhere (though +1 to the section talking about options in the docs).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I've had issues with Pub/Sub pipelines when the streaming option is not set. It could be an issue with xlang, but I agree it should be implicit, so it may be worth looking into fixing on the xlang side rather than explicitly defining in the YAML config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For python it is never implicit IIRC (we don't check whether sources are bounded or unbounded), so unless we're doing something special with yaml then I think we need this.

@tvalentyn to fact check me here, but I think we talked about making streaming implicit for Python but decided against it for back-compat reasons

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it implicit should not be backwards incompatible, as otherwise the pipeline wouldn't run, right?

If this is the case (we should confirm), I guess it's better to add it until the template is updated with the fix at the very least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvalentyn to fact check me here, but I think we talked about making streaming implicit for Python

I don't recall. What's the behavior in other SDKs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it implicit should not be backwards incompatible, as otherwise the pipeline wouldn't run, right?

The pipeline will just try to run in batch mode, which is actually probably fine for some unbounded sources (I think). It will run in batch mode, but generally functions fine AFAIK.

With that said, I don't really fully remember what our concerns with opting in to streaming first mode are. There are use cases here (example), though they're pretty dubious IMO.

I don't recall. What's the behavior in other SDKs?

Other SDKs opt in any unbounded sources to streaming pipelines.

Copy link
Contributor

@tvalentyn tvalentyn Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to have a consistent behavior across SDKs. If we consider making the change, we should run some queries re: # of pipelines that had a pubsub/kafka/periodicimpulse step and were not streaming. most of those if any might be launched in error? But if there are users who intentionally run such jobs, we need to see if they will have an opt-out mechanism from new behavior.. There is a --streaming option, but no --batch option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

```

Rather than using an explicit `WindowInto` operation, one may instead tag a
Expand All @@ -392,6 +394,8 @@ pipeline:
config:
topic: anotherPubSubTopic
format: json
options:
streaming: true
```

Note that the `Sql` operation itself is often a from of aggregation, and
Expand All @@ -417,6 +421,8 @@ pipeline:
config:
topic: anotherPubSubTopic
format: json
options:
streaming: true
```

The specified windowing is applied to all inputs, in this case resulting in
Expand Down Expand Up @@ -448,6 +454,8 @@ pipeline:
windowing:
type: fixed
size: 60s
options:
streaming: true
```

For a transform with no inputs, the specified windowing is instead applied to
Expand All @@ -473,6 +481,8 @@ pipeline:
config:
topic: anotherPubSubTopic
format: json
options:
streaming: true
```

One can also specify windowing at the top level of a pipeline (or composite),
Expand All @@ -499,6 +509,8 @@ pipeline:
windowing:
type: fixed
size: 60
options:
streaming: true
```

Note that all these windowing specifications are compatible with the `source`
Expand Down Expand Up @@ -530,6 +542,9 @@ pipeline:
windowing:
type: fixed
size: 5m

options:
streaming: true
```


Expand Down Expand Up @@ -584,6 +599,31 @@ providers:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
```

## Pipeline Options

[Pipeline options](https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options)
are used to configure different aspects of your pipeline, such as the pipeline runner that will execute
your pipeline and any runner-specific configuration required by the chosen runner. To set pipeline options,
append an options block at the end of your yaml file. For example:

```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
options:
streaming: true
```

## Other Resources

* [Example pipelines](https://gist.github.com/robertwb/2cb26973f1b1203e8f5f8f88c5764da0)
Expand Down
Loading