From eaca61d91dbf1d8031e78bd219aca9c48618fbdc Mon Sep 17 00:00:00 2001 From: steve Date: Wed, 31 Jul 2019 19:36:23 -0400 Subject: [PATCH] [Beam Release-2.37.0] Merge Conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: test commit Merge branch 'apache:master' into master Merge branch 'apache:master' into master Update README with latest PreCommit Jobs Update Postcommit jobs with latest jobs Update Performace job tests in readme update load job tests with latest updates update other jobs test with latest updates mismatch links fix update trigger phrase for some postCommit jobs correct trigger phrases in readme [BEAM-12391] update avro sink to close the opened file handle, instead of getting it by reference from the writer. This ensures that when we load the python writer implementation we don't run into an attribute error [BEAM-13482] Python fully qualified name external transforms. Add Python expansion service entry point. [BEAM-13509] Stop sharing SQS client between readers of same source. [BEAM-13565][Playground] Add GetPreparationOutput method into the .proto files; Regenerate files; update bom [BEAM-13569] Change Spark dependencies to implementation. They are needed to spin up a standalone Spark cluster. remove redundant dependency python sdk examples: Fixed typo in wordcount example. [BEAM-13591] Bump log4j2 version to 2.17.1 [BEAM-13459] Update CHANGES.md, add note about artifact caching for python jobs [BEAM-13587] Attempt to load AWS region from default provider chain in AwsOptions Merge pull request #16405: [BEAM-13587] Attempt to load AWS region from default provider chain in AwsOptions Merge pull request #16368: [BEAM-13509] Stop sharing SQS client between readers of same source. Add Flink runner support for OrderedListState. This version reads the list entirely into memory, so will not work well for large lists. Merge pull request #16404: [BEAM-13586] Fix NPE in DefaultS3ClientBuilderFactory Fix sdk_container_builder too many values to unpack error [BEAM-13480] Sickbay PubSubIntegrationTest.test_streaming_data_only on Dataflow (#16255) remove redundant testImplementation dependencies Merge pull request #16417 from y1chi/python_val [BEAM-13459] Fix sdk_container_builder too many values to unpack error [BEAM-13430] Swap to use "mainClass" instead of "main" since it was deprecated. (#16400) See https://docs.gradle.org/7.3.2/dsl/org.gradle.api.tasks.JavaExec.html?_ga=2.124534154.1653102406.1640909413-346869268.1640099071#org.gradle.api.tasks.JavaExec:main for details Fix remaining failing perf IT tests. [BEAM-13430] Replace deprecated "appendix" with "archiveAppendix" (#16401) See https://docs.gradle.org/7.3.2/dsl/org.gradle.api.tasks.bundling.AbstractArchiveTask.html?_ga=2.44668452.1653102406.1640909413-346869268.1640099071#org.gradle.api.tasks.bundling.AbstractArchiveTask:appendix for more details. Merge pull request #16415 from deadwind4/BEAM-13591 [BEAM-13015] Add jamm as a java agent to the Java SDK harness container (#16412) This allows for accurate object sizing for caching. [BEAM-13430] Partially revert https://github.com/apache/beam/commit/eaacf709a861ea1c260e3856b720266348b1aecc (#16419) Bump the version back to 2.37 Merge pull request #16246: [BEAM-12391] update avro sink to close the opened file handle directly Merge pull request #16266 [BEAM-13482] Python fully qualified name external transforms. [BEAM-13482] Python fully qualified name external transforms. Merge pull request #15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms * Supporting autosharding on JdbcIO.write transforms * Making autosharding optional * Adding validation * integration test * Reducing code duplication * Adding a maximum bundle size to avoid overwhelming the memory Merge pull request #15984: [BEAM-2791] OrderedListState for Flink [BEAM-11936] Enable FloatingPointAssertionWithinEpsilon errorprone check (#16261) [BEAM-11936] Enable LockNotBeforeTry errorprone check (#16259) [BEAM-11936] Enable errorprone unused checks (#16262) Add Nexmark Query 14 (#16337) This change adds Query14.java for testing the performance of Reshuffle transform. Co-authored-by: Arun Pandian Merge pull request #16396 from ibzib/BEAM-13569 [BEAM-13569] Change Spark dependencies to implementation. [BEAM-13015] Migrate all user state and side implementations to support caching. (#16263) This change also ensures that prefetch can be invoked on the iterable to prevent the prefetch being lost once the iterator is discarded. See https://s.apache.org/beam-fn-state-api-and-bundle-processing#heading=h.tms0ncgbzz6f Merge pull request #16418 from y1chi/perf-it-2 Fix remaining failing perf IT tests. [BEAM-13459] Update CHANGES.md, add note about artifact caching python jobs #16416 Upgrade python library versions in base_image_requirements.txt [BEAM-13015] Use 20% of memory when the maximum has been configured. (#16420) * [BEAM-13015] Use 20% of memory when the maximum has been configured. The boot.go always sets the -Xmx value but we need a fallback incase used in different environments. [BEAM-13567] Consolidate runner flag definition. (#16426) Merge pull request #16424 from y1chi/bump_pubsub Update python library versions in base_image_requirements.txt [BEAM-13601] Don't cache Row types for a schema. (#16427) This fixes apache_beam.tools.microbenchmarks_test.MicrobenchmarksTest.test_coders_microbenchmark. Also separate out the named tuple and row cases as they have little in common at this point. [BEAM-13430] Re-enable checkerframework (#16429) This fixes a bad merge conflicts that occurred on https://github.com/apache/beam/pull/16395 [BEAM-13430] Ensure that testRuntimeMigration depends on "default" configuration allowing us to get the main "jar" as a dependency. (#16430) This better mirrors what the "shadow" and "shadowTest" configurations do and what "testRuntime" used to do with gradle 6.x and earlier. Merge pull request #16277 from [BEAM-13124][Playground] Create readiness endpoint * [BEAM-13124][Playground] Implement handle func for /readiness address; Add deleting all prepared files/folders for RunCode Api method in case of any error with the cache; * [BEAM-13124][Playground] renaming * [BEAM-13124][Playground] refactoring; add check for numOfParallelJobs's value; * [BEAM-13124][Playground] merge with master; fix staticcheck issues; * [BEAM-13124][Playground] rename tests * [BEAM-13124][Playground] update comments Merge pull request #16314 from [BEAM-13260][Playground]Implement setup of CI test environment * Add module to create GKE cluster via terraform, helm chart to deploy to GKE and run examples CI and GH action to run all that * Update comments sections Merge pull request #16383 from [BEAM-13566][Playground] Add logic of sending preparation's output on the backend side * [BEAM-13565][Playground] Add GetPreparationOutput method into the .proto files; Regenerate files; * [BEAM-13566][Playground] Implement logic to save prepare step output Merge pull request #16382 from [BEAM-13565][Playground] Add GetPreparationOutput API method to .proto file [BEAM-13565][Playground] Add GetPreparationOutput API method to .proto file Merge pull request #16365 from [BEAM-13559][Playground] Remove tag in examples CD * [BEAM-13559][Playground] Remove beam playground tag from examples for CI/CD steps; Fix tests; * [BEAM-13559][Playground] Update test Merge pull request #16360 from [BEAM-13546][Playground] Update nginx configuration to enable embedding iframes * [Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83) * Update workflows for playground * Attempt to fix tests * Remove continue on error to catch errors * Fix linter problem for backend dockerfile * Update folder to run backend go linter * Moved flutter test to execution via gradle tasks * Revert "[Playground][BEAM-12941][Bugfix] Fix workflows for playground applications (#83)" (#88) This reverts commit b73f5f70ac4184b56a0a03922731c5f2f69b9566. * Added support for embedded iframe and also gzip compression of assets * Add gzip_static config to nginx Co-authored-by: Ilya Co-authored-by: Aydar Zainutdinov Co-authored-by: daria.malkova Co-authored-by: Pavel Avilov Co-authored-by: Aydar Farrakhov Merge pull request #16192 from [BEAM-13395] [Playground] Tag katas * Tag katas * Update tags * Update katas * Update playground/categories.yaml Co-authored-by: Ilya * Remove the repeated lines in categories.yaml; Edit tag; Co-authored-by: daria.malkova Co-authored-by: Ilya Merge pull request #16254 from [BEAM-13249][Playground] Security – Mock Network * Added proxy server with allow-list using mitmproxy * Added updated mock network dockerfile * Updated the format of proxy env variables; added proxy for Python backend * Added proxy for Go backend * Add python certificat * Stylefixes * Remove redundant Dockerfile * Added licenses * Added GCS to allow-list * Add allowed buckets * Applied Python style and changed comments * change name of lists * change name of lists * change name of the public bucket Co-authored-by: Sergey Kalinin Co-authored-by: daria-malkova Merge pull request #16377: [BEAM-13576] update bom [BEAM-12879] Prevented missing permission from failing GCS I/O 1. Errors caused by metrics gathering no longer fail the GCS I/O. 2. Added a debug level log about potential permission issue and missing project id label for reported metrics. 3. Added a test for the failing execution route. Merge pull request #16347: fix: move connector to use v1 BigQuery Storage Write API [BEAM-12879] Prevented missing permission from failing GCS I/O [BEAM-13603] Fix bug in apache_beam.utils.Shared (#16437) Co-authored-by: Ahmet Altay [BEAM-10345] Add an import guard to support recent google-cloud-spanner versions. (#16434) * Add an import guard to support recent google-cloud-python versions. * Ignore when library is not available [BEAM-13091] Generate missing staged names from hash for Dataflow runner add test [BEAM-13604] NPE while getting null from BigDecimal column Fixed empty labels treated as wildcard when matching cache files (#16440) 1. Fixed a test watching nested dictionaries by watching the concerned PCollections directly; 2. Fixed cache manager's "exists" implementations to avoid treating empty labels as wildcard when matching cache files. This bug is not a security issue but could cause unexpected behavior when getting materialized values of PCollections that are not cached. [BEAM-13570] Remove erroneous compileClasspath dependency. (#16438) * [BEAM-13570] Remove erroneous compileClasspath dependency. * Exclude unsupported test categories. [BEAM-13015] Plumb through process wide and bundle cache through the FnApiStateAccessor. (#16423) [BEAM-13015] Cache the state backed iterable used for large GBK results. (#16409) Merge pull request #16442: [BEAM-13604] NPE while getting null from BigDecimal column Fix formatting/alignment (#16443) Merge pull request #16183 from [BEAM-13427] [Playground] show logs for precompiled objects * [BEAM-13411][Playground] Add method to get logs of precompiledObjects * [BEAM-13427] show logs for precompiled objects * [BEAM-13427] add support for dots in pipeline optons * [BEAM-13427]: revert go file * [BEAM-13427]: fix build * [BEAM-13427] playground refactoring * [BEAM-13427] playground print logs errors Co-authored-by: AydarZaynutdinov [BEAM-10277] re-write encoding position tests to declare schema protos explicitly (#16267) Update local_env_tests.yml (#16444) [BEAM-13574] Filesystem abstraction Rename support (#16428) [BEAM-13597] Setup Go in github actions (#16446) [BEAM-13602] Prevented metrics gathering from failing bigtable io 1. bigtable mutate row might return None instead of Status when running into retryable errors. Handled the special case as DEADLINE_EXCEEDED because bigtable must have exhausted the retry timeout or when there is no retry policy inplace. 2. For all rows mutated, if the response status is not ok, that data might have lost. Added a TODO item for that. Merge pull request #15765 from ihji/BEAM-13091 [BEAM-13091] Generate missing staged names from hash for Dataflow runner Merge pull request #16161 from [BEAM-12164] Add Spanner Partition Metadata DAOs * [BEAM-12164] Add Spanner Partition Metadata DAOs * fix: remove metrics table from DAO * fix: fix compilation error * chore: fix linting violations * feat: add opencensus dependency * deps: update OpenCensus API to 0.30.0 Merge pull request #16203 from [BEAM-12164] Add Spanner Change Stream Mappers * [BEAM-12164] Add Spanner Partition Metadata DAOs * fix: remove metrics table from DAO * fix: fix compilation error * chore: fix linting violations * feat: add opencensus dependency * [BEAM-12164] Add Spanner Change Stream Mappers The mapper classes convert from Cloud Spanner Structs to the change stream models used by the connector. There are two mappers implemented: 1. For mapping to partition metadata models. 2. For mapping to change stream records (one of heartbeat, data or child partitions). * deps: update OpenCensus API to 0.30.0 [adhoc] Fix BigTableIO description Merge pull request #16453: [adhoc] Fix BigTableIO description [BEAM-8727] Bump software.amazon.awssdk to 2.17.106 make the code more pythonic [BEAM-13015] Remove dead code now that all instances have migrated to the state caching implementation. (#16447) [BEAM-13386] Add RLock support for cloudpickle (#16250) Fix overflow Merge pull request #16433 from danthev/py_overflow_fix [BEAM-13599] Fix overflow error in Python Datastore RampupThrottlingFn don't close streams in finalize Loosen typing extensions bound Merge pull request #16456: [BEAM-8727] Bump software.amazon.awssdk to 2.17.106 [BEAM-4868] Bump com.amazonaws to 1.12.135 [BEAM-13243][BEAM-8374] Add support for missing PublishResponse fields in SnsIO.Write (AWS SDK v2) Merge pull request #16454: [BEAM-4868] Bump com.amazonaws to 1.12.135 [BEAM-12092] Bump jedis to version 4.0.1 (#16287) Merge pull request #16464: [BEAM-13617] don't close streams in finalize [BEAM-13534] Add automated port polling to expansion service runner if port isn't provided (#16470) Merge pull request #16344 from [BEAM-13536][Playground][Bugfix] CI step doesn't log case with empty category value * [BEAM-13536][Playground] Add processing of empty value in Beam tag validation method * [BEAM-13536][Playground] Add comments Merge pull request #16359 from [BEAM-13545][Playground] Add GetValidationOutput API method to .proto file * [BEAM-13545][Playground] Add GetValidationOutput method into the .proto files; Regenerate files; * [BEAM-13545][Playground] Regenerating proto files Merge pull request #16384 from [BEAM-13308] [Playground] Getting baseFileFolder from environment * Getting baseFileFolder from environment * Replace 2 args to filepath.join on some methods * Refactoring code Merge pull request #16306 from [BEAM-13447] [Playground] Add filling of the chosen thumbs up/down button * [BEAM-13428] add google analytics to the playground * [BEAM-13428]: print analytics error on console * BEAM-13252] Feedback Thumbs Up/Down Functionality * [BEAM-13252] Deleted unnecessary comment * [BEAM-13447]: playground - add filling of the chosen thumbs up/down button * [BEAM-13447] playground - dont save feedback state on persitent storage * [BEAM-13447] update pubspec Co-authored-by: Alexander Merge pull request #16361 from [BEAM-13543][Playground] Add logic of sending validation's output on the backend side * [BEAM-13545][Playground] Add GetValidationOutput method into the .proto files; Regenerate files; * [BEAM-13543][Playground] Implement logic to save validation output * [BEAM-13543][Playground] minor refactoring * [BEAM-13543][Playground] change back from string to []byte * [BEAM-13545][Playground] Regenerating proto files [BEAM-12562] Dataframe pipe implementation (#16256) * [BEAM-12565] Series implementation of compare * [BEAM-12565] DataFrame implementation of compare * [BEAM-12565] Fixed minor issues and error checks * [BEAM-12562] Initial pipe implementation for DataFrame * [BEAM-12562] Pipe implementation for Series * [BEAM-12562] Pipe implementation for GroupBy * [BEAM-12562] Added tests for Series and GroupBy pipe * [BEAM-12562] Support for (callable, data_keyword) func in DataFrame, Series, and GroupBy Co-authored-by: Brian Hulette Merge pull request #16338 from [BEAM-13528][Playground] Add liveness check * [BEAM-13528][Playground] add liveness endpoint * [BEAM-13528][Playground] update comment [BEAM-13626] Remap expanded outputs after merging. (#16471) Merge pull request #16147 from [BEAM-13359] [Playground] Tag existing examples * Tag examples * Refactoring tags * Add multifile to tag * Updated categories of some examples * Edit tags * Edit tags * Update tags * fix spotless check * Update tags * MinimalWordCount.java and DistinctExample.java reads only one file * Fix checks * Change examples which read * from bucket to read one file * Update tags; Co-authored-by: Artur Khanin Co-authored-by: daria-malkova Co-authored-by: Pavel Avilov [BEAM-3221] Improve documentation in model pipeline protos (#16474) * clarify state caching tokens * fix documentation around side input types and valid request types. [BEAM-13614] Add OnWindowExpiration support to the Java SDK harness and proto translation. (#16458) This implementation adds a timer family spec in the event time domain and adds the field to the ParDoPayload mentioning which timer family spec represents the on window expiration callback. Optional args and kwargs for named external transforms. Merge pull request #16156 from [BEAM-13391] Fix temporary file format in WriteToBigQuery * Fix temporary file format in WriteToBigQuery * Change a desription * Fix pylint issue * Import BigQueryDisposition class * Combine both tets together * Fix lint issues Co-authored-by: Sayat Satybaldiyev Co-authored-by: Pablo Loosen typing extensions bound #16466 [adhoc] Run spotlessApply on java examples to fix master Merge pull request #16479: [adhoc] Run spotlessApply on java examples to fix master [BEAM-8806] Integration test for SqsIO [BEAM-13631] Add deterministic SQS message coder to fix reading from SQS in batch mode Merge pull request #16128: [BEAM-13243][BEAM-8374] Add support for missing PublishResponse fields in SnsIO.Write (AWS SDK v2) [BEAM-13628] Update SideInputCache to use full Transform and SideInputIDs as token information (#16483) [BEAM-13602] Prevented metrics gathering from failing bigtable io Merge pull request #16475 Optional args and kwargs for named external transforms. [BEAM-13432] Skip ExpansionService creation in Job Server (#16222) * [BEAM-13432] Update Java ExpansionService to be configurable of Environment 1. Update ExpansionService to use its own option instead of blindly inherit from pipeline when registering environment so it can be configurable. 2. Update JobServerDriver to skip creating expansion server when expansion port is negative. * Revert ExpansionService change * Add unit tests to test job server start up without expansion service. * Apply spotless check [BEAM-13616] Initial files for vendored gRPC 1.43.2 (#16460) Co-authored-by: Lukasz Cwik [BEAM-13638] Datatype of timestamp fields in SqsMessage for AWS IOs for SDK v2 was changed from String to long, visibility of all fields was fixed from package private to public Merge pull request #16491: [BEAM-13638] Fix visibility of fields in SqsMessage (AWS Sdk v2) [BEAM-13641][Playground] Add SCIO SDK support on the CI/CD step Merge pull request #16469 from [BEAM-13623][Playground] [Bugfix] During unit tests failing there is no any output * [BEAM-13623][Playground] Add getting of error's output from `RunOutput` in case unit test is failed * [BEAM-13623][Playground] fix of tests Merge pull request #16149 from [BEAM-13113] [Playground] playground frontend documentation * [BEAM-13113] playground frontend documentation * [BEAM-13113] playground frontend documentation structure * [BEAM-13113] fix flutter command * [BEAM-13113] remove trailing spaces * [BEAM-13133] update beam playground frontend readme Co-authored-by: Artur Khanin * [BEAM-13113] playground - fix typos on the docs Co-authored-by: Artur Khanin Merge pull request #16363 from [BEAM-13557] [Playground] show code execution time * [BEAM-13557] playground show code execution time * [BEAM-13557] fix playground reset * [BEAM-13557] playground - fix changing example * [BEAM-13557] improve null checks Merge pull request #16374 from [BEAM-13398][Playground] Split LifeCycle to DTO and business logic * [BEAM-13398][Playground] Add LifeCycleDto to separate DTO and business logic. * [BEAM-13398][Playground] Change LifeCycle structure * [BEAM-13398][Playground] Merge with master * [BEAM-13398][Playground] Merge with master Fix using `executable_files` value Fix comments * [BEAM-13398][Playground] renaming [BEAM-13616][BEAM-13646] Update vendored calcite 1.28.0 with protobuf 3.19.2 (#16473) Adds several example multi-language Python pipelines Merge pull request #16325 from [BEAM-13471] [Playground] Tag existing unit-tests * Tag java unit-test * Tag go unit-test * Tag python unit-test * Update tags * Fix checks * Edit tags * Update tags Merge pull request #16488: [BEAM-13637] Adds several example multi-language Python pipelines [BEAM-13399] Move service liveness polling to Runner type (#16487) Split builder into several builder for each step of pipeline execution [BEAM-8806] Integration test for SqsIO using Localstack Provide API to check whether a hint is known. [BEAM-13480] Increase pipeline timeout for PubSubIntegrationTest.test_streaming_data_only (#16496) Stronger typing inference for CoGBK. (#16465) Also fix empty union consistency check. [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the order for protobufs containing Oneof fields (#14974) * ProtoSchemaTranslator now orders oneof fields in the resultant beam schema in accordance with their location in the protobuf definition * add reverse order protobuf * add noncontiguous oneof and some renaming * Comments and variable renaming * add reversed row tests * add noncontiguous tests * remove redundant null check * minor test comment update * update * add reversedonof test * add noncontiguous oneof test Co-authored-by: Reuben van Ammers Introduce the notion of a JoinIndex for fewer shuffles. (#16101) This allow joining (aka zipping) operations to execute without requiring a global repartitioning as long as the operands have a common, unchanged ancestor index. Also add counter and tests to ensure expected fusion. Merge pull request #16467 from [BEAM-12164]: SpannerIO DetectNewPartitions SDF * [BEAM-12164]: SpannerIO DetectNewPartitions SDF Adds the DetectNewPartitions SDF. This component will be responsible for: - Emitting a watermark based on the min of all unfinished partitions in the metadata table. - Querying all partitions in the CREATED state. - Updating the created partitions to SCHEDULED state. - Emitting the scheduled partitions to the PCollection. This SDF will run periodically as based on the configured resume interval (default is 100ms, best effort). * chore: fix linting violations Co-authored-by: Hengfeng Li [BEAM-12558] Fix doc typo. Merge pull request #16385 from [BEAM-13535] [Playground] add cancel execution button * [BEAM-13535] playground - add cancel execution button * [BEAM-13535] fix merging cancel button * [BEAM-13535] fix hash code to recommended * [BEAM-13535] cancel pipeline execution log * [BEAM-13535] playground - fix merge conflicts Merge pull request #16485 from [BEAM-13486] [Playground] For unit tests (java) if one of tests fails the output goes to stdOutput * [BEAM-13486] Added errorOutput to output tab * [BEAM-13486] Fixed tests for code repository * [BEAM-13486] Fixed runOutputError printing Merge pull request #16413 from blais/master [BEAM-13455] Remove duplicated artifacts when using multiple environments with Dataflow Java [BEAM-12164]: Add SDF for reading change stream records Adds ReadChangeStreamPartitionDoFn, which is an SDF to read partitions from change streams and process them accordingly. This component receives a change stream name, a partition, a start time and an end time to query. It then initiates a change stream query with the received parameters. Within a change stream, 3 types of records can be received: 1. A Data record 2. A Heartbeat record 3. A Child partitions record Upon receiving #1, the function updates the watermark with the record's commit timestamp and emits the record into the output PCollection. Upon receiving #2, the function updates the watermark with the record's timestamp, but it does not emit any record into the PCollection. Upon receiving #3, the function updates the watermark with the record's timestamp and writes the new child partitions into the metadata table. These partitions will be later scheduled by the DetectNewPartitions component. Once the change stream query for the element partition finishes, it marks the partition as finished in the metadata table and terminates. [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema [BEAM-13400] JDBC IO does not support UUID and JSONB PostgreSQL types and OTHER JDBC types in general * Support BLOB, CLOB, OTHER, JAVA_OBJECT JDBC types * Add native support of UUID type Merge pull request #16225 from ihji/BEAM-13455 [BEAM-13455] Remove duplicated artifacts when using multiple environm… [BEAM-12572] Run java examples on multiple runners (#16450) [BEAM-10206] Resolve go vet errors in protox package Merge pull request #16284: [BEAM-13400] JdbcIO should support UUID and JSONB PostgreSQL types and OTHER JDBC types in general [BEAM-13656] Provide an API to check whether a hint is known. #16508 Merge pull request #16477 from [BEAM-13560][Playground] Split builder into several builders for each step of the pipeline execution [BEAM-13560][Playground] Split builder into several builders for each step of the pipeline execution Merge pull request #16482 from [BEAM-13429][Playground] Add builder for preparers * Split builder into several builder for each step of pipeline execution * Add preparers builder * Rename preparators -> preparers * Small renaming at builders * Forgotten renaming [BEAM-13590] Fix abc imports from collections (#15850) Fix staticcheck errors in transforms directory Remove unnecessary fmt.Sprintf() in partition.go Replace bytes.Compare() with bytes.Equal() in test cases Replace string(buf.Bytes()) with buf.String() in coder_test.go Remove unnecessary blank identifier assignment in harness.go fix capitalized error strings in expansionx Clean up string cast of bytes in vet.go and corresponding tests Remove unnecessary fmt call in universal.go Remove tab from source. Redirecting cross-language transforms content (#16504) * redirected xlang content * updated redirect and added blurb for Java multi-lang quickstart * removing unnecessary alias from programming guide * improved opening sentence of Java multi-language pipelines page Merge pull request #16528 Remove tab from source. Merge pull request #16520: [BEAM-10206] Resolve go vet errors in protox package [BEAM-10206] Resolve go vet errors in protox package doc tweaks (#16498) [BEAM-12621] - Update Jenkins VMs to modern Ubuntu version (#16457) Co-authored-by: Giomar Osorio <95301399+GiomarWize@users.noreply.github.com> Merge pull request #16524: [BEAM-10206] Clean up some simple staticcheck warnings in Go SDK [BEAM-10206] Clean up some simple staticcheck warnings in Go SDK Bump beam container version. [BEAM-13664] Fix Primitives hashing benchmark (#16523) * [BEAM-13628] Fix Primitives hashing benchmark * staticcheck cleanup Merge pull request #16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0 * [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0 Merge pull request #16468: [BEAM-8806] Integration test for SqsIO using Localstack (AWS Sdk v2) Remove jcenter repositories from gradle configuration. (#16532) JCenter has in read-only mode and appears to have degraded availability. https://blog.gradle.org/jcenter-shutdown Update GH Actions to use proper variables names and proper triggers Merge pull request #16492 from [BEAM-13641][Playground] Add SCIO SDK support on the CI/CD step [BEAM-13641][Playground] Add SCIO SDK support on the CI/CD step [BEAM-13430] Remove jcenter which will no longer contain any updates. (#16536) See https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter/ There are already issues with spurious timeouts/failures. This removes the remaining references left after https://github.com/apache/beam/pull/16532 [BEAM-13616] Update com.google.cloud:libraries-bom to 24.2.0 (#16509) [BEAM-13680] Fixed code_repository (added pipelineUuid to RunCodeResult when status is "Finished") Merge pull request #16480: [BEAM-13631] Add deterministic SQS message coder to fix reading from SQS in batch mode (AWS Sdk v1) Merge pull request #16481: [BEAM-8806] Integration test for SqsIO using Localstack (AWS Sdk v1) Also bump FnAPI container. [BEAM-13616][BEAM-13645] Switch to vendored grpc 1.43.2 (#16543) Merge pull request #16529 Bump dataflow worker container versions. [BEAM-13616][BEAM-13646] Upgrade vendored calcite to 1.28.0:0.2 (#16544) Merge pull request #16545 from [BEAM-13680] [Playground] [Bugfix] Fix tests on frontend [BEAM-13680] [Playground] [Bugfix] Fix tests on frontend Merge pull request #16486 from [BEAM-13544][Playground] Add logs to examples CI/CD to see the progress * [BEAM-13544][Playground] Add logs for the CI/CD steps Fix minor issues * [BEAM-13544][Playground] update logs' messages * [BEAM-13544][Playground] Update using of `tqdm` Add `tqdm` to requirements.txt * [BEAM-13544][Playground] Add logic to replace `\t` with spaces for tag * [BEAM-13544][Playground] Change using `logger` to `logging` * [BEAM-13544][Playground] Fix according to `linter` and `yapf` [BEAM-13683] Correct SQL transform schema, fix expansion address override bug (#16551) * Update expansion service address after overrides * Make options field unexported to match SQL transform schema Update walkthrough.md (#16512) Fixed naming error that leads to a ParseException. "appId" is the name of the field, but then in the query it erroneously said id in the WHERE. [BEAM-11808][BEAM-9879] Support aggregate functions with two arguments (#16200) * [BEAM-11808] Enable two params in aggregate functions, add string_agg with delimiter * [BEAM-11808] Fix checkstyle warning * [BEAM-11808] Fix spotlessApply * [BEAM-11808] Fix checkstyle warning * [BEAM-11808] Fix spotlessApply * [BEAM-11808] Change initialization StringAgg and minor fixes * [BEAM-11808] Fix checkstyle warnings * [BEAM-11808] Add test cases for array_agg and timestamp null max,min * [BEAM-11808] Remove leftover file * [BEAM-11808] Enable resolved_literal as firts arg * [BEAM-11808] Remove tests, validate RESOLVED_LITERAL as second argument * [BEAM-11808] Add unsupportedException for delimiter as ResolvedParam * use zetasql exception * update test Co-authored-by: Kyle Weaver Merge pull request #16506 from [BEAM-13652][Playground] Send examples' links to the frontend * [BEAM-13652][Playground] Add link to the PrecompiledObject * [BEAM-13652][Playground] Add a comment about `link` field Merge pull request #16322 from [BEAM-13407] [Playground] Preload fonts for the web application * [BEAM-13407] Added google fonts & licences to project * [BEAM-13407] Fixed license name in main.dart * [BEAM-13407] Removed unused font files * [BEAM-13407] Added licenses to google-fonts * [BEAM-13407] Changes in font licenses * [BEAM-13407] Fixed _getPipelineResult method of code_repository [BEAM-13665] Make SpannerIO projectID optional again (#16547) Fixes regression introduced by PR #15493 which inadvertently caused an NPE when the projectID was not specified for a SpannerIO read or write. Adds unit test for reading/writing both with and without projectID [BEAM-13015] Add state caching capability to be used as hint for runners to not duplicate cached data if the SDK can do it for user state and side inputs. (#16525) Merge pull request #16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor [BEAM-13015] Provide caching statistics in the status client. (#16495) * [BEAM-13015] Provide caching statistics in the status client. * Address PR comments. * Address PR comments. * Address PR comments by weighing the key. [BEAM-13611] Skip test_xlang_jdbc_write (#16554) Merge pull request #16370 from [BEAM-13556] playground - color and scroll tabs with new content * [BEAM-13556] playground - color and scroll tabs with new content * [BEAM-13556] playground - add enter after the processing has started log * [BEAM-13556] fix tests Merge pull request #16531 from [BEAM-13567] [playground] Handle run code validation and preparation errors Merge pull request #16533 from [BEAM-13548] [Playground] Add example description popover Merge pull request #16519 from [BEAM-13639] [Playground] Add notification to Log/Output tabs about cached example * [BEAM-13639] Added log about cached results for precompiled examples * [BEAM-13639] Fixed _getPipelineResult method of code_repository Merge pull request #16518 from [BEAM-13619] [Playground] Add loading animation to the catalog * [BEAM-13619] Added loading animation to the catalog * [BEAM-13619] Refactored loading indicator component * [BEAM-13619] Added license to loading_indicator component * [BEAM-13619] Fixed _getPipelineResult method of code_repository edited README and comments in Python multi-lang pipes examples Merge pull request #16243 from darshan-sj/feature/support-priority-spannerio - Making rpcPriority a ValueProvider in SpannerConfig * Making rpcPriority a ValueProvider in SpannerConfig * correcting build failure * correcting tests * formatting changes Allow Java 17 to be used in SDK add testing support Add more testing support for java 17 [BEAM-13683] Make cross-language SQL example pipeline (#16567) [BEAM-13688] fixed type in BPG 4.5.3 window section (#16560) Remove obsolete commands from Inventory job. (#16564) workaround for jamm Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms" This reverts commit 818428ff092d41e408af38832c86538d15c95033. BEAM-13611 reactivating jdbcio xlang test Disable logging for memoization test. (#16556) Test loggers may cache logged items, extending their lifetime thus violating this test's expectations about when things will be gc'd. [BEAM-13689] Output token elements when BQ batch writes complete. Merge pull request #16472: [BEAM-13697] Add SchemaFieldNumber annotation Merge pull request #16373 from [BEAM-13515] [Playground] Hiding lines in an example that are not necessary * [BEAM-13515] Implemented structure of initial scroll in CodeField * [BEAM-13515] Changed scroll mechanics * [BEAM-13515] Moved values to consts, changed quotes to single * [BEAM-13515] Changed RegExp for Java, moved RegExp strings to consts * [BEAM-13515] Clarified variable declaration [BEAM-13093] Enable JavaUsingPython CrossLanguageValidateRunner test for dataflow runner v2 Merge pull request #16569 from Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for … Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for … Merge pull request #16371 from [BEAM-13518][Playground] Beam Playground quickstart page on the Beam website * [WIP] Beam Playground quickstart page * Add Beam Playground quickstart page in Get Started section * Removed trailing whitespace * Updated the content of Apache Beam Playground quickstart page * Updated Beam Playground quickstart page Update Java FnAPI beam master (#16572) [BEAM-13699] Replace fnv with maphash. (#16573) [BEAM-13693] Bump beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming timeout to 12 hours (#16576) Merge pull request #16380 from talatuyarer/BEAM-13577-select-uniquifyNames-nullablity [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema Merge pull request #16561 from [BEAM-13689] Output TableDestination elements when BQ batch writes complete. [BEAM-13689] Output TableDestination elements when BQ batch writes complete. [BEAM-10206] Remove Fatalf calls in non-test goroutines for tests/benchmarks (#16575) [BEAM-13687] Improved Spanner IO request count metrics Moved the creation of metrics counter to startbundle. [BEAM-13430] Re-add provided configuration (#16552) Merge pull request #16566: edited README and comments in Python multi-lang pipes examples Merge pull request #16514 from [BEAM-12164]: Add SDF for reading change stream records [BEAM-12164]: Add SDF for reading change stream records Merge pull request #16540 from [BEAM-13678][Playground]Update Github Action To Deploy Examples * Fix deploy of examples * Update license Merge pull request #16539 from [BEAM-13677][Playground]Update GitHub Actions to Build Playground Containers In Proper Way [BEAM-13677][Playground]Update GitHub Actions to Build Playground Containers In Proper Way Merge pull request #16546 from [BEAM-13661] [BEAM-13704] [Playground] Update tags for examples/katas/unit-tests * Add default_example field to the tags; Remove empty optional fields from tags; * Add quickstart category * Add new line in the categories.yaml Merge pull request #16369 from [BEAM-13558] [Playground] Hide the Graph tab and SCIO from SDK options * [BEAM-13558] Hid the Graph tab from output area & SCIO from SDK options * [BEAM-13515] Added clarifying comments [BEAM-10206] Add key for fields in wrapper (#16583) Merge pull request #16530 from Adding JSON support in SpannerIO and Spanner schema parser * Adding JSON support in SpannerIO and Spanner schema parser * formatting changes [BEAM-13687] Improved Spanner IO request count metrics [BEAM-13685] Enable users to specify cache directory under Interactive Beam options Add Jenkins test for Java 17 Fix jvm hex and skip errorprone Fix display data for anonymous classes fix jpms tests [BEAM-13716] Clear before creating a new virtual environment in setupVirtualenv [BEAM-13653] Make SnsIO.write topicArn optional. If provided, validate it and force usage on the Sns publish request. Merge pull request #16503: [BEAM-13653] Fix usage of SnsIO.write topicArn (AWS Sdk v2) Privacy policy update regarding Apache Beam Playground [BEAM-10897] Update the fastavro lower bound due to an issue on Windows (#16553) [BEAM-13605] Update pandas_doctests_test denylists in preparation for pandas 1.4.0 (#16571) Merge pull request #16538 from [BEAM-13676][Playground][Bugfix]Build Of Playground Applications Is Broken * Update build gradle files to fix build of applications without double dockerfile issues * Test fix Co-authored-by: Ilya Co-authored-by: Artur Khanin Merge pull request #16582 from [BEAM-13711] [Playground] [Bugfix] Add Cloud Logging API in allow list * Add cloud logging api in allow list * Add api to allowed_list Merge pull request #16515 from [BEAM-13636] [Playground] Checking the default field on CI/CD step * Checking the default field on CI/CD step * Support an optional fields * Update proto; * Refactoring code * Change default value for default_example field * Refactoring code [BEAM-13275] Removed the explicit selenium dependency from setup The dependency can be transitively installed through needle, no need to fix it in setup. [BEAM-13275] Removed the explicit selenium dependency from setup [BEAM-13321] Fix exception with BigQuery StreamWriter TraceID. When running a BigQuery IO transform with a query and query location set, was given the exception: "java.lang.IllegalArgumentException: TraceId must follow the format of A:B. Actual:Dataflow". Documentation shows an example of using it with the format "Dataflow:job_id" which is what I changed this to: https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter.Builder#com_google_cloud_bigquery_storage_v1_StreamWriter_Builder_setTraceId_java_lang_String_ [BEAM-10206] Deprecate unused shallow cloning functions (#16600) Bump Dataflow container versions (#16602) Merge pull request #16591 from ihji/BEAM-13716 [BEAM-13716] Clear before creating a new virtual environment in setupVirtualenv Improved multi-language pipelines section of the programming guide (#16587) * improved multi-language pipelines section of the programming guide * made changes to multi-lang pipeline content, in response to feedback * updating Beam version number for supporting no-code Java xlang transforms * updating one more Beam version number skip zetasql Get rid of unnessecary logs for BigQuery streaming writes in auto-sharding mode. [BEAM-13510] Don't retry on invalid SQS receipt handles. Support SCIO SDK via sbt projects Merge pull request #16478 from mosche/BEAM-13510-InvalidRetries Merge pull request #16478: [BEAM-13510] Don't retry on invalid SQS receipt handles (AWS Sdk v2) [BEAM-8807] Add integration test for SnsIO.write (Sdk v1 & v2) [BEAM-13736] Make lifting cache exact. (#16603) Merge branch 'master' of github.com:MarcoRob/beam into BEAM-12650 Merge pull request #16565 from [BEAM-13692][Playground] Implement method to receive Graph * [BEAM-13692][Playground] Add `GRAPH` subKey Add `GetGraph` api method Update `GetValue` cache method to correctly receive graph from the cache * [BEAM-13692][Playground] fixes according to `staticcheck` * [BEAM-13692][Playground] fix test * [BEAM-13692][Playground] Fix error with `RunError` output * [BEAM-13692][Playground] Change graph from []byte to string * [BEAM-13692][Playground] Edit comment * [BEAM-13692][Playground] Update comments Co-authored-by: Ilya Merge pull request #16502 from [BEAM-13650][Playground] Add link for examples * [BEAM-13650][Playground] Add link for examples. * [BEAM-13650][Playground] Change using string concatenation Fix according to `linter` and `yapf` * [BEAM-13650][Playground] Change the `link` field add pending jobs to readme Update README.md removed comment [BEAM-13310] remove call to get offset consumer config, which was rep… (#16588) * [BEAM-13310] remove call to get offset consumer config, which was replacing the intended group id with a randomly generated group id for commits * [BEAM-13310] run spotless commit * [BEAM-13310] fix checkstyle error Merge pull request #16599: [BEAM-13321] Fix exception with BigQuery StreamWriter TraceID. Merge pull request #16594 from [BEAM-13710][Playground] Update the Apache Beam privacy policy with Playground note [BEAM-13710][Playground] Update the Apache Beam privacy policy with Playground note [BEAM-11648] Share thread pool across RetryManager instances. RetryManager's have short lifetime (for example processElement). This means that many different threads are created instead of effectively reusing them. This is adds expense but also can result in many threads because the each individual thread lasts for up to 60 seconds and until the GC runs. Merge pull request #16595З: [BEAM-8807] Add integration test for SnsIO.write (Sdk v1 & v2) [BEAM-13737][Playground] Update logic of receiving precompiled objects; Increase timeout; added GitHub example references to Python multilang quickstart [BEAM-13746] Fix deserialization of SSECustomerKey for AWS Sdk v2 [BEAM-7928] Allow users to specify worker disk type for Dataflow runner (#16622) Merge pull request #16623: [BEAM-13746] Fix deserialization of SSECustomerKey for AWS Sdk v2 Exclude per-key order tests on Twister2 runner Merge pull request #16534 from [BEAM-13671][Playground] Add backend contribute guide to the project * [BEAM-13671][Playground] Add CONTRIBUTE.md file * [BEAM-13671][Playground] Update CONTRIBUTE.md [BEAM-13271] Bump errorprone to 2.10.0 (#16231) * Bump errorprone to 2.10.0 * Bump autoservice to 1.0.1 Works around https://github.com/google/error-prone/issues/2745 for some Beam projects * Address new check failures (non-controversial) * Address new check failures (potentially controversial) * Disable some new checks * Disable a few more checks * Address check failures in test code * Add license reference for jgit * Explicitly ignore some return values in CoGbkResultTest * Update sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java Co-authored-by: kileys * Update SnsIOTest Co-authored-by: kileys [BEAM-13595] Don't load main session when cloudpickle is used. (#16589) Merge pull request #15767 from ihji/BEAM-13093 [BEAM-13093] Enable JavaUsingPython CrossLanguageValidateRunner test … Update readme for XVR tests Merge pull request #16626 from ihji/update_readme Update readme for XVR tests [adhoc] Test S3Options and AwsOptions for Sdk v2 [BEAM-13537] Fix NPE in kafkatopubsub example (#16625) Merge pull request #16628: [adhoc] Test S3Options and AwsOptions for Sdk v2 [BEAM-13740] update java_tests.yml to remove setup-go, which is misconfigured and blocking the build (#16608) Fix google3 import error Merge pull request #16604: [BEAM-13714] Get rid of unnecessary logs for BigQuery streaming writes [BEAM-12976] Implement Java projection pushdown optimizer. (#16513) * finish basic pushdown optimizer implementation * add docstrings * add tests for non pbegin inputs * support pushdown on producers with multiple outputs; ignore producers with inputs. * make test code private * fix null checking * use beam null check instead of java util * vendored guava * nit: dont need to use string format * address review comments Make num-stages counter into an internal counter. This unbreaks tests that (arguably to brittly) reject extra counters. Merge pull request #16635 from ihji/fix_google3_import Fix google3 import error due to missing Python gradle build file [BEAM-13751] Don't block on gcloud when attempting to get default GCP region. Merge branch 'master' of github.com:apache/beam into java7tests [BEAM-13751] Parameterize wait timeout so test doesn't waste 2s. [BEAM-13751] Add comment explaining sleep. Merge pull request #16621: added GitHub example references to Python multilang quickstart Merge pull request #16579 from Revert "Revert "Merge pull request #15863 from [BEAM-13184] Autoshard… * Revert "Revert "Merge pull request #15863 from [BEAM-13184] Autosharding for JdbcIO.write* transforms"" This reverts commit 421bc8068fc561a358cfbf6c9842408672872120. * Using batchSize to define element batch size * Handle corner case for null list Merge pull request #16606 from [BEAM-13247] [Playground] Embedding iframe * [BEAM-13247] Building embedded iframe with run functionality * [BEAM-13247] Implemented Run button & log section to the embedded iframe * [BEAM-13247] Refactored components * [BEAM-13247] Restricted editing in embedded iframe * [BEAM-13247] Implemented editable/not editable versions of iframe, added Output/Log TabBar * [BEAM-13247] Redesigned page providers for EmbeddedPlaygroundPage, changed default ratio of the EmbeddedSplitView * [BEAM-13247] Fixes after merge * [BEAM-13247] Added possibilities to change height & use any text in iFrame, removed codeScrolling. Added new iFrame on the try-beam-playground page * [BEAM-13247] Changed iFrame src link * [BEAM-13247] Fixed PR remarks Update Python SDK beam-master tags (#16630) Merge pull request #16592 from [BEAM-13722][Playground] Add precompiling of the graph into examples CI/CD * [BEAM-13722][Playground] Add receiving graph value from backend to send it as a metadata field * [BEAM-13722][Playground] Add license * [BEAM-13722][Playground] Regenerate proto files * [BEAM-13722][Playground] Remove graph from meta file; Add sending of graph as a separate file; * [BEAM-13722][Playground] add condition for SDK * [BEAM-13722][Playground] Regenerate proto files * [BEAM-13722][Playground] Regenerate proto files * [BEAM-13722][Playground] small fix * [BEAM-13722][Playground] add try-except to receive graph Merge pull request #16505 from [BEAM-13527] [Playground] Pipeline options dialog * [BEAM-13527] pipeline options dropdown * [BEAM-13527] playground - parse pipeline error message * [BEAM-13527] playground - fix parse options * [BEAM-13527] playground - move pipelines options lines count to const * [BEAM-13527] playground fix tests * fix merge * [BEAM-13527] pipeline options fix review comments * [BEAM-13527] pipeline options fix review comments spotless spotless Generalize S3FileSystem to support multiple URI schemes. This ports https://github.com/apache/beam/pull/15036 to aws2. [BEAM-13768] Fix NullPointerException in BigQueryStorageSourceBase Fix Java SDK container image name for load-tests and nexmark Merge pull request #16639 from ibzib/BEAM-13751 [BEAM-13751] Don't block on gcloud when attempting to get default GCP… [BEAM-13293] XLang Jdbc IO for Go SDK (#16111) [BEAM-10206] Add Go Vet to Github Actions (#16612) Merge pull request #16607: [BEAM-13245] Generalize S3FileSystem to support multiple URI schemes. Change executable name fo go tests Fix java test [BEAM-13769] Skip test_main_session_not_staged_when_using_cloudpickle (#16651) Merge pull request #16663 from [BEAM-13773] [Playground] [Bugfix] Error during executing of tests on Java SDK [BEAM-13773] [Playground] [Bugfix] Error during executing of tests on Java SDK Merge pull request #16662 from [BEAM-13772][Playground][Bugfix] Change executable name for go tests [BEAM-13772][Playground][Bugfix] Change executable name for go tests [BEAM-6744] Support implicitly setting project id in Go Dataflow runner (#16643) Merge pull request #16493 from [BEAM-13632][Playground] Save catalog data to the cache * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Updates regarding comments * Update for environment_service_test.go * Docstring update for GetPrecompiledObjects Exclude jul-to-slf4j from Spark runner in quickstart POM templates (#16656) [BEAM-13734] Support cache directories that use GCS buckets [BEAM-11936] Enable a few errorprone checks that were broken by pinned guava in annotationProcessor (#16647) * Don't pin old guava for annotationProcessor configurations * Enable broken checks * Fix CassandraIOTest assertion [BEAM-13780] Add CONTRIBUTING.md pointing to main guide (#16666) Co-authored-by: Danny McCormick Merge pull request #16624 from [BEAM-13749] Exclude per-key order tests on Twister2 runner [BEAM-13749] Exclude per-key order tests on Twister2 runner [BEAM-13777] Accept cache capacity as input parameter instead of default max integer (#16652) * Change cache capacity to suggested default value instead of max integer to prevent out of memory error * Receive cache capacity as input prarmeter * Spotless fixes * [BEAM-13777] keep .of methods compatibility and use suggested default value for cache capacity by confluent [BEAM-13051][A] Enable pylint warnings (function-redefined/bad-super-call) (#16521) Merge pull request #16563 from [BEAM-13701][Playground] Support SCIO SDK via sbt projects [BEAM-13701][Playground] Support SCIO SDK via sbt projects [BEAM-13779] Fix pr labeling (#16665) Co-authored-by: Danny McCormick Merge pull request #16581 from [BEAM-12164]: Add SpannerIO.readChangeStreams * feat: add experimental spanner readChangeStreams Adds the SpannerIO.readChangeStreams feature that will enable users to consume a change stream from Cloud Spanner. This feature is under preview now, and can only be used for allowlisted customers. When reading a change stream the users will be able to operate on a PCollection of DataChangeRecords, containing the modifications made to the database as well as the type of operation. * fix: remove public api exposure of opencensus Do not expose the Opencensus TraceSampler in the SpannerIO.readChangeStreams. This is done so that we can upgrade the opencensus library without having to concern ourselves with which version customers are using. This commit also removes the deserializer option since it is not used. Co-authored-by: Zoe Cai Fix labeler trigger (#16674) Merge pull request #16619: [BEAM-11648] Share thread pool across RetryManager instances. Merge pull request #16645 from ihji/fix_nexmark_dataflow_v2 Fix Java SDK container image name for load-tests and nexmark [BEAM-13781] Exclude grpc-netty-shaded from gax-grpc's dependency (#16672) [BEAM-13051] Fixed pylint warnings : raising-non-exception (E0710), super-on-old-class (E1002) (#16541) * [BEAM-BEAM-13051] Fixed pylint warnings unexpeted-keyword-arg (E1123), undefined-variable (E0602) * Delete raising-non-exception and super-on-old-class from .pylintrc * Disable checks that broke tests * Delete extra blanck lines [BEAM-13740] Correctly install go before running tests (#16673) [BEAM-12830] Update local Docker env Go version. (#16670) [BEAM-13051][B] Enable pylint warnings (import-error/invalid-unary-operand-type) (#16522) * [BEAM-13051] Pylint invalid-unary-operand-type warning enabled * [BEAM-13051] Added unary negative operator to Timestamp * [BEAM-13051] Disabled invalid-unary-operand-type warning in specific cases [BEAM-13430] Revert Spark libraries in Spark runner to provided (#16675) [BEAM-12240] Add Java 17 support (#16568) Merge branch 'master' of github.com:apache/beam into java7tests [BEAM-13760] Add random component to default python dataflow job name (#16641) * Add random component to default python dataflow job name * Correct formattiing of multiline statements * Remove uuid from random string implementation. Fix trigger Merge pull request #16655 from [BEAM-12164]: Add retry protection to DetectNewPartitions * feat: add experimental spanner readChangeStreams Adds the SpannerIO.readChangeStreams feature that will enable users to consume a change stream from Cloud Spanner. This feature is under preview now, and can only be used for allowlisted customers. When reading a change stream the users will be able to operate on a PCollection of DataChangeRecords, containing the modifications made to the database as well as the type of operation. * fix: remove public api exposure of opencensus Do not expose the Opencensus TraceSampler in the SpannerIO.readChangeStreams. This is done so that we can upgrade the opencensus library without having to concern ourselves with which version customers are using. This commit also removes the deserializer option since it is not used. * [BEAM-12164]: Add retry protection to DetectNewPartitions The original algorithm of the DetectNewPartitions is susceptible to failures, because it produces side effects on every try which is not idempotent. Specifically, it marks the partitions as SCHEDULED in the Spanner database and outputs them. If there is a bundle commit failure, during retry, the already SCHEDULED partitions will not be picked up again. We change the algorithm in this PR to always schedule partitions that have a created at timestamp greater than the one saved in the DetectNewPartitions restriction. When scheduling the partitions, this SDF will also claim the created at of such partitions, advancing the timestamp saved. If there is a bundle commit failure, the restriction timestamp won't be saved, thus the partitions in the bundle will be picked up again regardless of their state. Co-authored-by: Zoe Cai Merge pull request #16586 from [BEAM-13731] FhirIO: Add support for BATCH bundle errors. * FhirIO: Add support for BATCH bundle errors. * SpotlessApply skip checker framework fix app name [BEAM-13011] Adds a link to Multi-language Pipelines Tips wiki page (#16649) * Adds a link to Multi-language Pipelines Tips wiki page * Address reviewer comment * Add anchor [BEAM-13734] Support cache directories that use GCS buckets remove duplicate property check [BEAM-12572] Run python examples on multiple runners (#16154) [BEAM-13574] Large Wordcount (#16455) Avoid packaging avro in the java harness jar. Unfortunately we can't simply shade it, as it's used in the public API (AvroCoder) of sdks:java:core. However, outside that use, it should not be needed when running in portable mode. Users will have to provide their own avro (typically by depending on sdks:java:core, but possibly choosing another version explicitly or from elsewhere). [BEAM-13293] Refactor JDBC IO Go Wrapper (#16686) * [BEAM-13293] refactor jdbcio * [BEAM-13293] resolve comments Edit license script for Java, add manual licenses for xz (#16692) [BEAM-13563] Restructure Kinesis Source for Aws 2 internally to prepare for ClientBuilderFactory Merge pull request #16657: [BEAM-13563] Restructure Kinesis Source for AWS 2 [BEAM-4665] Allow joining a running dataflow pipeline without throwing (#16689) [BEAM-13801] Add standard coder tests for state backed iterable. (#16696) Add coverage for Java and Python. Skip testing for Go. Go change a future follow-up. Merge branch 'master' of github.com:apache/beam into java7tests Merge pull request #16644: [BEAM-13768] Fix NullPointerException in BigQueryStorageSourceBase [BEAM-13430] Fix provided configuration by removing extendsFrom for implementation (#16688) [BEAM-12830] Print clearer go version fail message (#16693) Add reference to Books to Learning Resources in website Use ThreadLocal for DESERIALIZATION_CONTEXT (#16680) Co-authored-by: Kellen Dye Merge pull request #16682: Add reference to Books to Learning Resources in website Minor: Add apt update after adding deadsnakes repository in dev env (#16708) Merge pull request #16638 from robertwb/internal-counters Make num-stages counter into an internal counter. Merge pull request #16694 [BEAM-13800] Avoid packaging avro in the java harness jar. [BEAM-13807] Regenerate container images to get TF 2.8.0 (#16707) * Regenerate container images to get TF 2.8.0 * Add tf-estimator-nightly license URL [BEAM-13399, BEAM-13683] Eagerly materialize artifacts for automated expansion service, add feature to SQL transform (#16671) Merge pull request #16617 from [BEAM-13743] [Playground] Add context line for examples * Add context line for examples * Subtract number of tag lines from context_line * Update proto * Update licenses; Merge pull request #16618 from [BEAM-13744] [Playground] Add context line field to the tags * Add context_line to tags * Edit context_line`s in the tags * Edit info about default_example Merge pull request #16698 from [BEAM-13802][Playground] [Bugfix] Clean all build directroies, clean kubernetes env [BEAM-13293][BEAM-13806] Pipe a SchemaIO flag through Go integration test script. (#16705) This is needed for integration tests of Schema IO based xlang transforms, like JDBC or BigQuery. Merge pull request #16620 from [BEAM-13737][Playground] Local retrieving of example objects may fail with context deadline [BEAM-13737][Playground] Local retrieving of example objects may fail with context deadline [BEAM-13605] Modify groupby.apply implementation in preparation for pandas 1.4.0 (#16706) * Modify groupby.apply implementation in preparation for pandas 1.4.0 * fixup! Modify groupby.apply implementation in preparation for pandas 1.4.0 * Address review comments Merge pull request #16436 from [BEAM-1330] - DatastoreIO Writes should flush early when duplicate keys arrive * Switching to a HashSet to flush batch when duplicate is found * Keeping track of duplicates in a separate HashSet * clearing HashSet when flushing the data * Adding test for the duplicated entries scenario * Changing test implementation to use TestPipeline * Applying spotless * Executing tests with Mutations * Forcing the processing of the List of mutation in the same order the mutations were added to the original List * Moving the batchSize.update call to within the flushBatch method * Bumping to 7 since batchSize.update is now being called from within flushBatch [BEAM-13813] Add support for URL artifact to extractStagingToPath [BEAM-13663] Remove unused duplicate option for AWS client configuration (AWS Sdk v1). Remove Python SQL Test example from catalog [BEAM-13246] Add support for S3 Bucket Key at the object level (AWS Sdk v2). Fix timer consistency in direct runner [BEAM-13757] adds pane observation in DoFn (#16629) Merge pull request #16650:[BEAM-11971] Fix directrunner timer consistency Change links to Books from Amazon to Publisher [BEAM-13605] Add support for pandas 1.4.0 (#16590) * Addding new functions to / fixing doctests * Add _rename and value_counts() * Move import statement * Add if DataFrame has value_counts attr * Fix typo * Update precommit script and setup.py to 1.4 * Add backwards compatability for rename and replace * Add docstring and simplify kwargs * Skip DataFrame construction with series * Add change to CHANGES.md * Skip failing pyarrow test * Add pandas 1.4 to tox.ini Merge pull request #16718: [website] Change links to Books from Amazon to Publisher [BEAM-13761] adds Debezium IO wrapper for Go SDK (#16642) [BEAM-12976] Log projection pushdown optimizations. Merge pull request #16640 from kileys/java7tests [BEAM-12240] Add Jenkins tests for Java 17 [BEAM-13024] Unify PipelineOptions behavior (#16719) Update sdks/go/pkg/beam/artifact/materialize_test.go Co-authored-by: Robert Burke Merge pull request #16710 from ihji/BEAM-13813 [BEAM-13813] Add support for URL artifact to extractStagingToPath Merge pull request #16713 from [BEAM-13815] [Playground] [Bugfix] Remove Python SQL Test example from catalog [BEAM-13815] [Playground] [Bugfix] Remove Python SQL Test example from catalog Merge pull request #16605 from [BEAM-13634][Playground] Create a separate Dockerfile for the routing service * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Updates regarding comments * Update for environment_service_test.go * Docstring update for GetPrecompiledObjects * Add router services as separate container * too stage container builder * add empty string to build_playground_backend.yml * fix comments Co-authored-by: Artur Khanin Co-authored-by: akustov Co-authored-by: Ilya Merge pull request #16593 from [BEAM-13725][Playground] Add graph to the precompiled objects * [BEAM-13725][Playground] Add graph field to `PrecompiledObject` * [BEAM-13725][Playground] Fix test * [BEAM-137235][Playground] Remove `graph` from PrecompiledObject Add a new methods to receive graph of the PrecompiledObject. * [BEAM-13725][Playground] Update comments; Remove unnecessary field; * [BEAM-13725][Playground] Fix tests * [BEAM-13725][Playground] Regenerate proto files * [BEAM-13725][Playground] renaming * [BEAM-13725][Playground] Regenerate files * [BEAM-13725][Playground] Regenerate proto files Merge pull request #16699 from [BEAM-13789][Playground] Change logic of keeping examples to the bucket on CD side * [BEAM-13789][Playground] Change examples' folders structure Fix by `yapf` and `pylint` * [BEAM-13789][Playground] Change examples' folders structure Fix by `yapf` and `pylint` Fixed CSS for Case study page Merge pull request #16730: Fixed CSS for Case study page [BEAM-13203] Deprecate SnsIO.writeAsync for AWS Sdk v2 due to risk of data loss. Merge pull request #16712: [BEAM-13663] Remove unused duplicate option for AWS client configuration (AWS Sdk v1) Merge pull request #16711: [BEAM-13203] Deprecate SnsIO.writeAsync for AWS Sdk v2 due to risk of data loss. [BEAM-13828] Fix stale bot (#16734) Merge pull request #16364 from [BEAM-13182] Add diagrams to backend readme * Add diagrams to backend README.md * Fix comments * Fix comments [BEAM-13811] Fix save_main_session arg in tests examples (#16709) Update beam-master version Merge branch 'master' of github.com:apache/beam into beammaster [BEAM-13015] Calculate exception for closing BeamFnDataInboundObserver2 statically (#16537) * [BEAM-13015] Calculate exception for closing BeamFnDataInboundObserver2 statically. This avoids the expense of constructing the stack trace. Fix up CancellableQueue to respect comments that the first cancellation exception will be kept. Otherwise the close() call in the finally block could overwrite the more detailed cancellation exception. Previously this was masked in the test because the original exception was a cause of the generated close exception. * Update sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java Co-authored-by: Lukasz Cwik [BEAM-12572] Change jobs to run as cron jobs Minor doc tweaks for validating vendoring. (#16747) [BEAM-13686] OOM while logging a large pipeline even when logging level is higher (#16559) * Use not deprecated methods for logging a pipeline proto * Don't calculate potentially heavy string when the logging level is higher [BEAM-13629] Update URL artifact type for Dataflow Go (#16490) [BEAM-13832] Add automated expansion service start-up to JDBCio (#16739) [BEAM-13831] Add automated expansion service infra into Debezium Read() (#16738) Merge pull request #16742 from kileys/beammaster Update beam-master version [BEAM-13821] Add automated expansion service start-up to KafkaIO (#16716) [BEAM-13799] Created a Dataproc cluster manager for Interactive Beam Merge pull request #16727: [BEAM-11971] remove unsafe Concurrent data structure Merge pull request #16726 from [BEAM-12164]: Parses change streams fields as json / strings The backend is migrating from returning strings for certain fields to json. We need to change the parsing logic to accommodate for both until the migration is completed. [BEAM-13147] Avoid nullness issue during init of AwsModule (AWS Sdk v2) [BEAM-13839] Upgrade zstd-jni to version 1.5.2-1 [BEAM-13840] Fix usage of legacy rawtypes in AWS modules [BEAM-13820] Changed color of delete icon in pipeline options dropdown, removed unused imports Missing contribution Merge pull request #16752: [BEAM-13147] Avoid nullness issue during init of AwsModule (AWS Sdk v2) [BEAM-11971] Revert "Fix timer consistency in direct runner" (#16748) This reverts commit 8ea77f98376c396fa1b7002c98e26187bfe2d239. [BEAM-13193] Aggregates fn api outbound data/timers of different endpoints (#16439) [BEAM-13767] Migrate a bundle of grade tasks to use configuration avoidance API. (#16648) Merge pull request #16653 from [BEAM-12164]: Add integration tests for spanner change streams * feat: add experimental spanner readChangeStreams Adds the SpannerIO.readChangeStreams feature that will enable users to consume a change stream from Cloud Spanner. This feature is under preview now, and can only be used for allowlisted customers. When reading a change stream the users will be able to operate on a PCollection of DataChangeRecords, containing the modifications made to the database as well as the type of operation. * fix: remove public api exposure of opencensus Do not expose the Opencensus TraceSampler in the SpannerIO.readChangeStreams. This is done so that we can upgrade the opencensus library without having to concern ourselves with which version customers are using. This commit also removes the deserializer option since it is not used. * [BEAM-12164]: add integration tests for spanner change streams Adds integration tests to test the SpannerIO.readChangeStreams functionality. * fix: update InitializeDoFn javadocs Updates the InitializeDoFn javadocs to say the function is a DoFn, not a SDF. * refactor: remove unused configuration Removes the unused read timeout configuration from the test pipeline options. * test: recreate database on test setup Recreates the database when executing the Spanner change streams test. This way, we can have multiple tests executing simultaneously with a very little chance of clashing. * test: update default database prefix Updates the test database prefix to changestream so it is identifiable that the database is for this purpose. * test: add ordered key test * fix: fix linting violations * test: increases window size for test Increases the window size for test in order to reduce flakiness. * chore: remove deprecated todos / fixmes * chore: fix linting violations * chore: fix linting violations Co-authored-by: Zoe Cai Merge pull request #16728 from [BEAM-13823] Update docs for SnowflakeIO * [BEAM-13823] Update docs for SnowflakeIO * fixup! [BEAM-13823] Update docs for SnowflakeIO * fixup! fixup! [BEAM-13823] Update docs for SnowflakeIO Merge pull request #16746 from benWize/BEAM-12572-cron-jobs [BEAM-12572] Change python examples jobs to run periodically Merge pull request #16660 from [BEAM-13771][Playground] Send multifile value to the frontend * [BEAM-13771][Playground] Add multifile value to the response to the frontend * [BEAM-13771][Playground] Regenerate files * [BEAM-13771][Playground] Regenerate files Co-authored-by: Ilya Merge pull request #16646 from [BEAM-13643][Playground] Setup running backend to verify SCIO SDK examples * Add scio server * Merge master into BEAM-13643 * Revert "Merge master into BEAM-13643" This reverts commit bfb3f6c327b3574b29d140e4f12afba75ad7299f. * reformat + fix variables.tf Co-authored-by: akustov Co-authored-by: Ilya Kozyrev [BEAM-13015] Add state caching benchmark and move benchmarks to their own module. (#16542) * [BEAM-13015] Add state caching benchmark and move benchmarks to their own module. This simplifies adding unit tests and decreases the clutter in the sdks/java/harness module. * Make jmhTest run only one iteration without forking to speed it up. Also clean-up comments around args and fix classpath to only be runtime classpath. * increase timeout to 60 seconds Merge pull request #16724 from ibzib/log-opt [BEAM-12976] Log projection pushdown optimizations. [BEAM-13419] Check for initialization in dataflow runner (#16765) Merge pull request #16758 from [BEAM-13820] [Playground] Outline delete icon in Pipeline options [BEAM-13820] [Playground] Outline delete icon in Pipeline options Merge pull request #16701 from [BEAM-13786] [Playground] [Bugfix] Update CI/CD to verify only single-file examples * Update CI/CD to verify only single-file examples * Refactoring code * list comprehension to filter Merge pull request #16754 from [BEAM-13838][Playground] Add logs in case of empty graph for CD step * [BEAM-13838][Playground] Add logs for CD step * [BEAM-13838][Playground] Fix typos [BEAM-13293] consistent naming for expansion service address and flag update (#16764) Merge pull request #16700 from [BEAM-13790][Playground] Change logic of receiving examples from the bucket on backend side * [BEAM-13790][Playground] Update logic of receiving examples from bucket; Fix of setting up `link` for the `meta.file` * [BEAM-13790][Playground] Remove unused code Output successful rows from BQ Streaming Inserts [BEAM-13830] update dependency for debeziumio expansion service (#16743) * update dependency for expansion service * update dependency [BEAM-13761] consistent namings for expansion address in Debezium IO (#16766) [BEAM-13806] Shutting down SchemaIO expansion services from Go VR script. (#16770) [release-2.36.0] Update website/changelog for release 2.36.0 (#16627) [BEAM-13848] Update numpy intersphinx link (#16767) [release-23.6.0] Fix JIRA link for 2.36 blog (#16771) [BEAM-13647] Use role for Go worker binary. (#16729) Merge pull request #16756: [BEAM-13840] Fix usage of legacy rawtypes in AWS modules Merge pull request #16755: [BEAM-13839] Upgrade zstd-jni to version 1.5.2-1 BEAM-13439 Type annotation for ptransform_fn Because the return type of the ptransform_fn is not specified, [certain typecheckers](https://github.com/microsoft/pyright/issues/2688#issuecomment-990238720) will assume the type signature of functions decorated with it remain unchanged. However, ptransform_fn specifically adjusts the type signature such that the function no longer takes a pcoll. This can be corrected by changing the the definition of ptransform_fn from: ```python def ptransform_fn(fn): ``` to: ```python def ptransform_fn(fn) -> Callable[..., _PTransformFnPTransform]: This allows the typechecker to understand the return type of a function decorated with ptransform_fn will instead be a _PTransformFnPTransform type. ``` Additionally, for Python 3.10, [ParamSpec](https://www.python.org/dev/peps/pep-0612/#parameter-specification-variables) could be used to preserve the signature of the internal function as well. ``` from typing import ParamSpec, TypeVar, Concatenate P = ParamSpec("P") T = TypeVar("T") R = TypeVar("R") def ptransform_fn(fn: Callable[Concatenate[PCollection[T], P], R]) -> Callable[[P], _PTransformFnPTransform]: ... ``` [BEAM-13606] Fail bundles with failed BigTable mutations (#16751) * Fail bundles with failed BigTable mutations. Most Beam runners should retry such bundles. * Fix lint * Fix unittest BEAM-13439 Type annotation for ptransform_fn #16517 Merge pull request #16768 from Output successful rows from BQ Streaming Inserts Output successful rows from BQ Streaming Inserts [adhoc] Remove remaining usage of Powermock from aws2. fix broken links in jobs & remove the invalid ones Merge pull request #16714: [BEAM-13246] Add support for S3 Bucket Key at the object level (AWS Sdk v2) Update Dataflow Python dev container images. Merge pull request #16610 from MarcoRob/BEAM-12650 [BEAM-12650] Jenkins README updated with all existing tests Add java 17 to changes [BEAM-12914] Add missing 3.9 opcodes to type inference. (#16761) [BEAM-13321] Initial BigQueryIO externalization. (#16489) * [BEAM-13321] Initial BigQueryIO externalization. Create a SchemaIO implementation for BigQueryIO, to externalize it so it can be used with cross-language. This initial implementation covers the bare minimum, and still needs additional debugging (it seems to have trouble reading elements from some tables), so this is currently marked as experimental. * Fixup, null checks, and spotless * [BEAM-13321] Add documentation for defaults and an experimental warning. [BEAM-13193] Enable process bundle response elements embedding in Java SDK Harness (#16769) [BEAM-13830] added a debeziumio_expansion_addr flag to GoSDK (#16780) Apply spotless. (#16783) [BEAM-13732] Switch x-lang BigQueryIO expansion service to GCP one. This keeps our jars from inflating much. Having GCP on the SchemaIO exp. service adds ~60 MB to the jar, as opposed to around 1 MB to the existing GCP exp. service. Merge pull request #16782 from ibzib/py0208 Update Dataflow Python dev container images. [BEAM-13858] Fix broken github action on :sdks:go:examples:wordCount (#16785) add jira for runner v2 [BEAM-13732] Go SDK BigQuery IO wrapper. Initial implementation. (#16598) [BEAM-13732] Add example for Go BigQuery IO wrapper. (#16786) Also does some polish and fixup to the Kafka taxi example to fix little nits that were noticed while adding this new example. Merge pull request #16779 from kileys/changes Add Java 17 to CHANGES Update CHANGES.md with Go SDK milestones. (#16787) Co-authored-by: Daniel Oliveira Merge pull request #16784: [BEAM-13732] Switch x-lang BigQueryIO expansion service. [BEAM-13732] Switch x-lang BigQueryIO expansion service to GCP one. [BEAM-13193] Allow BeamFnDataOutboundObserver to flush elements. (#16778) Merge pull request #16777: [adhoc] Remove remaining usage of Powermock from aws2. [BEAM-13803] Add support for native iterable side inputs to the Go SDK (#16775) [BEAM-11095] Better error handling for illegal emit functions (#16776) Merge pull request #16613 from Supporting JdbcIO driver in classpath for x-lang * Supporting JdbcIO driver in classpath * Logging added * Adding test with two different DBs * Improving documentation * Addressing comments Merge pull request #15848 from [BEAM-13835] An any-type implementation for readWithPartitions for JdbcIO * Sketching an any-type implementation for readWithPartitions for JdbcIO * String, Long and DateTime partitioning set up * Adding jdbc to javaPostcommit * Adding support for auto-inference of ranges * Adding support for automatic partitioning * Tests for postgres and mysql * Adding support for Row output, improving documentation, addressing comments * Addressing comments * Adding unit tests for range partitioners * Reduced auto-inferred number of partitions * Reduced default inferred number of partitions * Remove wrong comment * Remove support for strings until BEAM-13846 * Fixing ITs and adding comments * Better strings in test * Address comment [BEAM-12920] Assume that bare generators types define simple generators. Add a container for Python 3.9. Allow job submission with Python 3.9 on Dataflow runner Add Python 3.9 test suites. Keep Dataflow V1 suites unchanged for now. Add py3.9 Github actions suites. Py39 Doc updates. [BEAM-9980] Simplify run_validates_container.sh to avoid branching. Update Cython to a new version that has py39 wheels. [BEAM-13845] Fix comparison with potentially incomparable default values. [BEAM-12920] Assume that bare generators types define simple generators. Mark Python 3.9 as supported version. [release-2.36.0][website] Fix github release notes script, header for release 2.36.0 (#16792) Use shell to run python for setupVirtualenv (#16796) Merge pull request #16759 from davidpr91/patch1 Add missing Bug and contributing user [BEAM-13830] Properly shut down Debezium expansion service in IT script. Merge pull request #16797: [BEAM-13830] Properly shut down Debezium expansion service in IT script. [BEAM-13830] Properly shut down Debezium expansion service in IT script. [BEAM-12920] Assume that bare generator types define simple generators (#16791) Merge pull request #16659 from [BEAM-13774][Playground] Add user to container * [BEAM-13774][Playground] Change Dockerfile for each supported SDK Separate entrypoint.sh and proxy.sh * [BEAM-13774][Playground] Update Dockerfiles; Add user for Scio SDK; * [BEAM-13774][Playground] small fixes * [BEAM-13774][Playground] Update Dockerfiles [BEAM-13868] Remove gsutil dep from hdfs IT test. [BEAM-13776][Playground] (#16731) Update `README.md` for the playground [BEAM-12000] Add Python 3.9 support. #16008 [BEAM-13867] Drop NaNs returned by nlargest in flight_delays example pipeline (#16801) Announce Python 3.9 in CHANGES.md (#16802) Set Dataflow container to release version. [release-2.37.0] CP a typo fix in job_PreCommit_Portable_Python.groovy (#16813) Cherry-picks https://github.com/apache/beam/pull/16812 [BEAM-13855] Skip SpannerChangeStreamOrderedWithinKeyIT and SpannerChangeStreamIT(#16799) (#16828) [BEAM-12164]: Fixes SpannerChangeStreamIT (#16806) (#16831) The GSON object was being initialized in a static context. When the DoFn that used it was called in another JVM, which did not initialized this object, we were getting a NPE exception. Here, we move the initialization of the GSON object inside the DoFn itself (through a setup method). Co-authored-by: Thiago Nunes [BEAM-13930] Address StateSpec consistency issue between Runner and Fn API. (#16852) The ability to mix and match runners and SDKs is accomplished through two portability layers: 1. The Runner API provides an SDK-and-runner-independent definition of a Beam pipeline 2. The Fn API allows a runner to invoke SDK-specific user-defined functions Apache Beam pipelines support executing stateful DoFns[1]. To support this execution the Runner API defines multiple user state specifications: * ReadModifyWriteStateSpec * BagStateSpec * OrderedListStateSpec * CombiningStateSpec * MapStateSpec * SetStateSpec The Fn API[2] defines APIs[3] to get, append and clear user state currently supporting a BagUserState and MultimapUserState protocol. Since there is no clear mapping between the Runner API and Fn API state specifications, there is no way for a runner to know that it supports a given API necessary to support the execution of the pipeline. The Runner will also have to manage additional runtime metadata associated with which protocol was used for a type of state so that it can successfully manage the state’s lifetime once it can be garbage collected. Please see the doc[4] for further details and a proposal on how to address this shortcoming. 1: https://beam.apache.org/blog/stateful-processing/ 2: https://github.com/apache/beam/blob/3ad05523f4cdf5122fc319276fcb461f768af39d/model/fn-execution/src/main/proto/beam_fn_api.proto#L742 3: https://s.apache.org/beam-fn-state-api-and-bundle-processing 4: http://doc/1ELKTuRTV3C5jt_YoBBwPdsPa5eoXCCOSKQ3GPzZrK7Q [release-2.37.0][BEAM-13694] Force hadoop-hdfs-client in hadoopVersion tests for hdfs (#16864) [release-2.37.0][BEAM-13919] Annotate PerKeyOrderingTest with UsesStatefulParDo (#16866) [BEAM-13919] Annotate PerKeyOrderingTest with UsesStatefulParDo. Co-authored-by: Kyle Weaver [cherry-pick][release-2.37.0][BEAM-13931] - make sure large rows cause BQIO to throw an error (#16862) * BEAM-13931 - make sure large rows cause BQIO to throw an error * prototyping new behavior * Add test for retryTransients [release-2.37.0][BEAM-13955] Fix pylint breakage from #16836 (#16868) [release-2.37.0][BEAM-13921] filter out debeziumIO test for spark runner (#16873) Co-authored-by: Ritesh Ghorse [release-2.37.0][BEAM-13959] Fix StorageApi writes when a field is named f (#16886) Co-authored-by: reuvenlax [cherry-pick][release-2.37.0][BEAM-12164]: fix bug when retrieving either string or json #16890 (#16900) struct.getValue() throws an error when getting a struct that contains a json inside. We circumvent this, by checking the type and calling either struct.getString() or struct.getJson(). Co-authored-by: Thiago Nunes [release-2.37.0] Fix BoundedQueueExecutor and StreamingDataflowWorker to actually limit memory from windmill (#16901) (#16941) Currently, because the queue is only limited by number of elements, there can be up to (num threads + queue size) elements outstanding at a time, which for large work items will almost certainly OOM the worker. This change both makes this limit explicit and adds a 50% JVM max memory limit on outstanding WorkItems to push back on windmill before workers run out of memory. Co-authored-by: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> [release-2.37.0][BEAM-12000] Build wheels for Python 3.9 #16964 (#16968) Co-authored-by: tvalentyn [release-2.37.0][BEAM-14005] Fix ignored exception in BatchSpannerRead (#16969) Failures to read from Spanner were ignored, and the "ok" serviceCallMwtric was updated before the read took place. Fix code and tests. Co-authored-by: Niel Markwick [release-2.37.0][BEAM-13980] Re-add method gone missing in af2f8ee6 (#16918) (#16967) Co-authored-by: Janek Bevendorff release-2.35.0-twttr1 (cherry picked from commit 6f816a21b593e1c1218378564cf4e3e8613f5443) Decorrelate state cleanup timers (cherry picked from commit ef74f252a8c133bf771294af1745ed181c0f6fda) Don't publish test jars (cherry picked from commit 40e88cc9349e32dee7da5b17dfec7cfc2322f386) Unmerged Dataflow Runner Enhancements (cherry picked from commit 3435610cdda9d4e6e325c44e555e39c09d916ffa) Differential Revision: https://phabricator.twitter.biz/D852780 --- .../beam/gradle/BeamModulePlugin.groovy | 1 + .../dataflow/WorkerMetricsReceiver.java | 29 ++ .../options/DataflowPipelineDebugOptions.java | 20 + .../worker/build.gradle | 3 + .../worker/legacy-worker/build.gradle | 8 +- .../dataflow/worker/BatchDataflowWorker.java | 30 +- .../worker/BatchModeExecutionContext.java | 12 + .../worker/DataflowSystemMetrics.java | 38 +- .../MetricTrackingWindmillServerStub.java | 22 +- .../worker/StreamingDataflowWorker.java | 461 +++++++++++++++--- .../dataflow/worker/WindmillStateCache.java | 23 +- .../worker/WindmillStateInternals.java | 2 +- .../dataflow/worker/WindmillStateReader.java | 23 +- .../dataflow/worker/WorkItemStatusClient.java | 24 + .../worker/counters/CounterFactory.java | 147 +++--- .../dataflow/worker/util/MemoryMonitor.java | 37 +- .../DataflowElementExecutionTrackerTest.java | 7 +- .../worker/WindmillStateReaderTest.java | 13 +- .../worker/counters/CounterFactoryTest.java | 8 +- 19 files changed, 737 insertions(+), 171 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/WorkerMetricsReceiver.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8c398845dd7b5..14ee3bfc7772d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -611,6 +611,7 @@ class BeamModulePlugin implements Plugin { http_client : "org.apache.httpcomponents:httpclient:$httpclient_version", http_core : "org.apache.httpcomponents:httpcore:$httpcore_version", influxdb_library : "org.influxdb:influxdb-java:$influxdb_version", + hdrhistogram : "org.hdrhistogram:HdrHistogram:2.1.11", jackson_annotations : "com.fasterxml.jackson.core:jackson-annotations:$jackson_version", jackson_jaxb_annotations : "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:$jackson_version", jackson_core : "com.fasterxml.jackson.core:jackson-core:$jackson_version", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/WorkerMetricsReceiver.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/WorkerMetricsReceiver.java new file mode 100644 index 0000000000000..871671df85885 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/WorkerMetricsReceiver.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; + +@Experimental +public interface WorkerMetricsReceiver { + void receiverCounterUpdates(List updates); + + default void receiveTopologyUpdate(String systemName, String userName) {} +} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 2995a485527cd..cf4b1a2498d2f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -166,6 +166,26 @@ public Dataflow create(PipelineOptions options) { void setWindmillServicePort(int value); + @Default.Integer(1) + int getNumCommitStreams(); + + void setNumCommitStreams(int value); + + @Default.Integer(1) + int getNumGetDataStreams(); + + void setNumGetDataStreams(int value); + + @Default.Integer(10) + int getNumGetDataThreads(); + + void setNumGetDataThreads(int value); + + @Default.Integer(1) + int getNumDispatchThreads(); + + void setNumDispatchThreads(int value); + /** * Number of threads to use on the Dataflow worker harness. If left unspecified, the Dataflow * service will compute an appropriate number of threads to use. diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index da210b4617ff9..c1cfff30d1381 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -89,7 +89,10 @@ dependencies { permitUnusedDeclared library.java.google_api_client // BEAM-11761 implementation library.java.google_auth_library_credentials implementation library.java.google_http_client + implementation library.java.google_http_client_jackson2 implementation library.java.google_http_client_gson + implementation library.java.guava + implementation library.java.hdrhistogram implementation library.java.jackson_annotations implementation library.java.jackson_core implementation library.java.jackson_databind diff --git a/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle b/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle index 1050c66d14cbf..601140416ea1a 100644 --- a/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle @@ -81,7 +81,8 @@ def excluded_dependencies = [ applyJavaNature( archivesBaseName: 'beam-runners-google-cloud-dataflow-java-legacy-worker', - publish: false, + automaticModuleName: 'org.apache.beam.runners.dataflow.worker', + publish: true, classesTriggerCheckerBugs: [ 'BatchGroupAlsoByWindowAndCombineFn': 'TODO: file a bug report', 'AssignWindowsParDoFnFactory': 'TODO: file a bug report', @@ -110,6 +111,10 @@ applyJavaNature( dependencies { include(dependency(library.java.slf4j_jdk14)) } + + dependencies { + include(dependency(library.java.hdrhistogram)) + } dependencies { include(project(path: ":model:fn-execution", configuration: "shadow")) @@ -223,6 +228,7 @@ dependencies { implementation "org.eclipse.jetty:jetty-server:9.2.10.v20150310" implementation "org.eclipse.jetty:jetty-servlet:9.2.10.v20150310" implementation library.java.avro + implementation library.java.hdrhistogram implementation library.java.jackson_annotations implementation library.java.jackson_core implementation library.java.jackson_databind diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index cb697c1a8c760..07f53663607bc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.dataflow.worker; +import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.*; +import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_IS_THRASHING; +import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_NUM_PUSHBACKS; + import com.google.api.services.dataflow.model.MapTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.Closeable; @@ -30,6 +34,8 @@ import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness; import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction; import org.apache.beam.runners.dataflow.worker.graph.CreateExecutableStageNodeFunction; @@ -147,9 +153,15 @@ public class BatchDataflowWorker implements Closeable { private final SdkHarnessRegistry sdkHarnessRegistry; private final Function> mapTaskToNetwork; + private final CounterSet memoryCounter; private final MemoryMonitor memoryMonitor; private final Thread memoryMonitorThread; + private static final CounterName BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME = + CounterName.named("work_item_success"); + private static final CounterName BATCH_WORK_ITEM_FAILURE_COUNTER_NAME = + CounterName.named("work_item_failure"); + /** * Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java * execution. @@ -212,7 +224,12 @@ protected BatchDataflowWorker( .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) .build(); - this.memoryMonitor = MemoryMonitor.fromOptions(options); + this.memoryCounter = new CounterSet(); + this.memoryMonitor = + MemoryMonitor.fromOptions( + options, + memoryCounter.longSum(MEMORY_MONITOR_NUM_PUSHBACKS.counterName()), + memoryCounter.longSum(MEMORY_MONITOR_IS_THRASHING.counterName())); this.statusPages = WorkerStatusPages.create( DEFAULT_STATUS_PORT, this.memoryMonitor, sdkHarnessRegistry::sdkHarnessesAreHealthy); @@ -327,6 +344,11 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr DataflowWorkExecutor worker = null; SdkWorkerHarness sdkWorkerHarness = sdkHarnessRegistry.getAvailableWorkerAndAssignWork(); + CounterSet counterSet = new CounterSet(); + Counter workItemsReceived = counterSet.longSum(WORK_ITEMS_RECEIVED.counterName()); + Counter workItemSuccess = counterSet.longSum(BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME); + Counter workItemFailure = counterSet.longSum(BATCH_WORK_ITEM_FAILURE_COUNTER_NAME); + try { // Populate PipelineOptions with data from work unit. options.setProject(workItem.getProjectId()); @@ -340,10 +362,10 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr throw new RuntimeException("Unknown kind of work item: " + workItem.toString()); } - CounterSet counterSet = new CounterSet(); BatchModeExecutionContext executionContext = BatchModeExecutionContext.create( counterSet, + this.memoryCounter, sideInputDataCache, sideInputWeakReferenceCache, readerRegistry, @@ -386,11 +408,15 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr DataflowWorkProgressUpdater progressUpdater = new DataflowWorkProgressUpdater(workItemStatusClient, workItem, worker, options); + + workItemsReceived.addValue(1L); executeWork(worker, progressUpdater); + workItemSuccess.addValue(1L); workItemStatusClient.reportSuccess(); return true; } catch (Throwable e) { + workItemFailure.addValue(1L); workItemStatusClient.reportError(e); return false; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 19670f47fc8e1..45c9d1629c1cc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -34,6 +34,8 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; +import org.apache.beam.runners.dataflow.worker.counters.CounterSet; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; @@ -64,6 +66,7 @@ public class BatchModeExecutionContext protected final Cache logicalReferenceCache; protected final PipelineOptions options; protected final ReaderFactory readerFactory; + private final CounterSet memoryDeltaCounter; private Object key; private final MetricsContainerRegistry containerRegistry; @@ -81,6 +84,7 @@ public class BatchModeExecutionContext private BatchModeExecutionContext( CounterFactory counterFactory, + CounterSet memoryDeltaCounter, Cache> dataCache, Cache logicalReferenceCache, ReaderFactory readerFactory, @@ -93,6 +97,7 @@ private BatchModeExecutionContext( executionStateTracker, executionStateRegistry, Long.MAX_VALUE); + this.memoryDeltaCounter = memoryDeltaCounter; this.logicalReferenceCache = logicalReferenceCache; this.readerFactory = readerFactory; this.options = options; @@ -115,6 +120,7 @@ public static BatchModeExecutionContext forTesting( BatchModeExecutionStateRegistry stateRegistry = new BatchModeExecutionStateRegistry(); return new BatchModeExecutionContext( counterFactory, + new CounterSet(), CacheBuilder.newBuilder() .maximumWeight(1_000_000) // weights are in bytes .weigher(Weighers.fixedWeightKeys(8)) @@ -224,6 +230,7 @@ protected DataflowOperationContext.DataflowExecutionState createState( public static BatchModeExecutionContext create( CounterFactory counterFactory, + CounterSet memoryCounter, Cache> dataCache, Cache logicalReferenceCache, ReaderFactory readerFactory, @@ -233,6 +240,7 @@ public static BatchModeExecutionContext create( BatchModeExecutionStateRegistry executionStateRegistry = new BatchModeExecutionStateRegistry(); return new BatchModeExecutionContext( counterFactory, + memoryCounter, dataCache, logicalReferenceCache, readerFactory, @@ -327,6 +335,10 @@ public Cache getLogicalReferenceCache() { return rval; } + public Iterable extractMemoryCounters() { + return memoryDeltaCounter.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE); + } + /** {@link DataflowStepContext} used in batch mode. */ public class StepContext extends DataflowExecutionContext.DataflowStepContext { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java index 0dae356b754b6..086678a5d034b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java @@ -40,7 +40,33 @@ public enum StreamingSystemCounterNames { JAVA_HARNESS_MAX_MEMORY("dataflow_java_harness_max_memory"), JAVA_HARNESS_RESTARTS("dataflow_java_harness_restarts"), WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"), - MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing"); + MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing"), + STATE_CACHE_WEIGHT("state_cache_weight"), + STATE_CACHE_MAX_WEIGHT("state_cache_max_weight"), + STATE_CACHE_SIZE("state_cache_size"), + STATE_CACHE_HITS("state_cache_hits"), + STATE_CACHE_REQUESTS("state_cache_requests"), + STATE_CACHE_HIT_RATE("state_cache_hit_rate"), + STATE_CACHE_EVICTIONS("state_cache_evictions"), + STATE_CACHE_INVALIDATE_REQUESTS("state_cache_invalidate_requests"), + STATE_CACHE_INVALIDATES_FROM_INCONSISTENT_TOKEN( + "state_cache_invalidates_from_inconsistent_token"), + STATE_CACHE_STALE_WORK_TOKEN_MISSES("state_cache_stale_work_token_misses"), + COMMIT_DURATION_MS("commit_duration_ms"), + COMMIT_SIZE_BYTES_PER_COMMIT("commit_size_bytes_per_commit"), + COMMIT_SIZE_BYTES("commit_size_bytes"), + CURRENT_COMMIT_SIZE_BYTES("current_commit_size_bytes"), + WORK_ITEMS_RECEIVED("work_items_received"), + GET_WORK_ITEM_BATCHES_RECEIVED("get_work_item_batches_received"), + WORK_ITEMS_PER_BATCH("work_items_per_batch"), + COMPUTATION_WORK_ITEMS_RECEIVED("computation_work_items_received"), + GET_WORK_ITEM_WAIT_TIME_MS("get_work_item_wait_time_ms"), + STATE_FETCH_BATCHES("state_fetch_batches"), + STATE_FETCH_BATCH_SIZE("state_fetch_batch_size"), + STATE_FETCH_LATENCY_MS("state_fetch_latency_ms"), + MEMORY_MONITOR_IS_THRASHING("memory_monitor_is_thrashing"), + MEMORY_MONITOR_NUM_PUSHBACKS("memory_monitor_num_pushbacks"), + ; private final String name; @@ -72,7 +98,15 @@ public enum StreamingPerStageSystemCounterNames { * This is based on user updated metric "throttled-msecs", reported as part of system metrics so * that streaming autoscaler can access it. */ - THROTTLED_MSECS("dataflow_throttled_msecs"); + THROTTLED_MSECS("dataflow_throttled_msecs"), + + STATE_FETCHES("state_fetches_per_stage"), + + STATE_FETCH_LATENCY("state_fetch_latency_per_stage"), + + INPUT_WATERMARK_LAG("input_watermark_lag_ms"), + + OUTPUT_WATERMARK_LAG("output_watermark_lag_ms"); private final String namePrefix; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java index d2eff46ef770b..0331dc2e0df62 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java @@ -39,8 +39,10 @@ * requests and throttles requests when memory pressure is high. * *

External API: individual worker threads request state for their computation via {@link - * #getStateData}. However, requests are either issued using a pool of streaming rpcs or possibly - * batched requests. + * #getStateData}. However, we want to batch requests to WMS rather than calling for each thread, so + * calls actually just enqueue a state request in the local queue, which will be handled by up to + * {@link #numThreads} polling that queue and making requests to WMS in batches of size {@link + * #MAX_READS_PER_BATCH}. */ @SuppressWarnings({ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) @@ -53,6 +55,8 @@ public class MetricTrackingWindmillServerStub { private final WindmillServerStub server; private final MemoryMonitor gcThrashingMonitor; private final boolean useStreamingRequests; + private final int numStreams; + private final int numThreads; private static final class ReadBatch { ArrayList reads = new ArrayList<>(); @@ -68,8 +72,6 @@ private static final class ReadBatch { private WindmillServerStub.StreamPool streamPool; private static final int MAX_READS_PER_BATCH = 60; - private static final int MAX_ACTIVE_READS = 10; - private static final int NUM_STREAMS = 1; private static final Duration STREAM_TIMEOUT = Duration.standardSeconds(30); private static final class QueueEntry { @@ -89,7 +91,13 @@ private static final class QueueEntry { } public MetricTrackingWindmillServerStub( - WindmillServerStub server, MemoryMonitor gcThrashingMonitor, boolean useStreamingRequests) { + WindmillServerStub server, + MemoryMonitor gcThrashingMonitor, + boolean useStreamingRequests, + int numStreams, + int numThreads) { + this.numStreams = numStreams; + this.numThreads = numThreads; this.server = server; this.gcThrashingMonitor = gcThrashingMonitor; // This is used as a queue but is expected to be less than 10 batches. @@ -101,7 +109,7 @@ public void start() { if (useStreamingRequests) { streamPool = new WindmillServerStub.StreamPool<>( - NUM_STREAMS, STREAM_TIMEOUT, this.server::getDataStream); + numStreams, STREAM_TIMEOUT, this.server::getDataStream); } } @@ -112,7 +120,7 @@ public void start() { private @Nullable ReadBatch addToReadBatch(QueueEntry entry) { synchronized (this) { ReadBatch batch; - if (activeReadThreads < MAX_ACTIVE_READS) { + if (activeReadThreads < numThreads) { assert (pendingReadBatches.isEmpty()); activeReadThreads += 1; // fall through to below synchronized block diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c49865aa7511c..7e7f78f70598f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; +import com.google.api.services.dataflow.model.ParallelInstruction; import com.google.api.services.dataflow.model.Status; import com.google.api.services.dataflow.model.StreamingComputationConfig; import com.google.api.services.dataflow.model.StreamingConfigTask; @@ -58,8 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import javax.servlet.http.HttpServletRequest; @@ -70,6 +73,7 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.WorkerMetricsReceiver; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.util.CloudObject; @@ -81,6 +85,7 @@ import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry; import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions; import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.CounterUpdateAggregators; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; @@ -133,6 +138,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.TextFormat; @@ -152,6 +158,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTimeUtils; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -191,6 +198,11 @@ public class StreamingDataflowWorker { // Maximum number of threads for processing. Currently each thread processes one key at a time. static final int MAX_PROCESSING_THREADS = 300; static final long THREAD_EXPIRATION_TIME_SEC = 60; + + // Maximum work units retrieved from Windmill and queued before processing. Limiting this delays + // retrieving extra work from Windmill without working on it, leading to better + // prioritization / utilization. + static final int MAX_WORK_UNITS_QUEUED = 500; static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB static final int NUM_COMMIT_STREAMS = 1; @@ -365,10 +377,10 @@ public int size() { // Value class for a queued commit. static class Commit { - - private Windmill.WorkItemCommitRequest request; - private ComputationState computationState; - private Work work; + private final Windmill.WorkItemCommitRequest request; + private final ComputationState computationState; + private final Work work; + private final Instant commitCreateTime; public Commit( Windmill.WorkItemCommitRequest request, ComputationState computationState, Work work) { @@ -376,6 +388,7 @@ public Commit( assert request.getSerializedSize() > 0; this.computationState = computationState; this.work = work; + this.commitCreateTime = Instant.now(); } public Windmill.WorkItemCommitRequest getRequest() { @@ -393,6 +406,10 @@ public Work getWork() { public int getSize() { return request.getSerializedSize(); } + + public Instant getCommitCreateTime() { + return commitCreateTime; + } } // Maps from computation ids to per-computation state. @@ -419,9 +436,10 @@ public int getSize() { private final ThreadFactory threadFactory; private DataflowMapTaskExecutorFactory mapTaskExecutorFactory; private final BoundedQueueExecutor workUnitExecutor; + private final BoundedQueueExecutor dispatchExecutor; private final WindmillServerStub windmillServer; - private final Thread dispatchThread; - private final Thread commitThread; + private final Thread[] dispatchThreads; + private final Thread[] commitThreads; private final AtomicLong activeCommitBytes = new AtomicLong(); private final AtomicBoolean running = new AtomicBoolean(); private final StateFetcher stateFetcher; @@ -448,8 +466,28 @@ public int getSize() { private final Counter javaHarnessMaxMemory; private final Counter windmillMaxObservedWorkItemCommitBytes; private final Counter memoryThrashing; + private final Counter stateCacheWeight; + private final Counter stateCacheSize; + private final Counter stateCacheMaxWeight; + private final Counter stateCacheEvictions; + private final Counter stateCacheHits; + private final Counter stateCacheRequests; + private final Counter stateCacheHitRate; + private final Counter stateCacheInvalidateRequests; + private final Counter stateCacheInvalidatesFromInconsistentToken; + private final Counter stateCacheStaleWorkTokenMisses; + private final Counter commitDurationMs; + private final Counter commitSizeBytesPerCommit; + private final Counter commitSizeBytes; + private final Counter currentCommitSizeBytes; + private final Counter workItemsReceived; + private final Counter getWorkItemBatchesReceived; + private final Counter computationWorkItemsReceived; + private final Counter getWorkItemWaitTimeMs; + private final Counter itemsPerStateFetch; private Timer refreshWorkTimer; private Timer statusPageTimer; + private Timer resetWatermarkMetricsTimer; private final boolean publishCounters; private Timer globalWorkerUpdatesTimer; @@ -493,6 +531,7 @@ public int getSize() { private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry(); private HotKeyLogger hotKeyLogger; + private final Iterable workerMetricReceivers; /** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */ private static class StageInfo { @@ -502,17 +541,49 @@ private static class StageInfo { final MetricsContainerRegistry metricsContainerRegistry; final StreamingModeExecutionStateRegistry executionStateRegistry; final CounterSet deltaCounters; + final CounterSet cumulativeCounters; final Counter throttledMsecs; final Counter totalProcessingMsecs; final Counter timerProcessingMsecs; + final Counter stateFetches; + final Counter stateFetchLatency; + final Counter inputWatermarkLag; + final Counter outputWatermarkLag; + + private long lastWatermarkUpdate = 0L; + + void updateWatermarkMetrics(Instant inputDataWatermark, Instant outputDataWatermark) { + long nowMillis = DateTimeUtils.currentTimeMillis(); + long watermarkLag = Math.max(0, nowMillis - inputDataWatermark.getMillis()); + inputWatermarkLag.getAndReset(); + inputWatermarkLag.addValue(watermarkLag); + + if (outputDataWatermark != null) { + long outputWatermarkLagMillis = Math.max(0, nowMillis - outputDataWatermark.getMillis()); + outputWatermarkLag.getAndReset(); + outputWatermarkLag.addValue(outputWatermarkLagMillis); + } + + lastWatermarkUpdate = nowMillis; + } - StageInfo(String stageName, String systemName, StreamingDataflowWorker worker) { + void resetStaleWatermarkMetrics() { + long nowMillis = DateTimeUtils.currentTimeMillis(); + if (nowMillis - lastWatermarkUpdate > 60_000 * 5) { + inputWatermarkLag.getAndReset(); + outputWatermarkLag.getAndReset(); + } + } + + StageInfo( + String stageName, String systemName, String userName, StreamingDataflowWorker worker) { this.stageName = stageName; this.systemName = systemName; metricsContainerRegistry = StreamingStepMetricsContainer.createRegistry(); executionStateRegistry = new StreamingModeExecutionStateRegistry(worker); - NameContext nameContext = NameContext.create(stageName, null, systemName, null); + NameContext nameContext = NameContext.create(stageName, null, systemName, userName); deltaCounters = new CounterSet(); + cumulativeCounters = new CounterSet(); throttledMsecs = deltaCounters.longSum( StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(nameContext)); @@ -522,6 +593,18 @@ private static class StageInfo { timerProcessingMsecs = deltaCounters.longSum( StreamingPerStageSystemCounterNames.TIMER_PROCESSING_MSECS.counterName(nameContext)); + stateFetches = + deltaCounters.longSum( + StreamingPerStageSystemCounterNames.STATE_FETCHES.counterName(nameContext)); + stateFetchLatency = + deltaCounters.distribution( + StreamingPerStageSystemCounterNames.STATE_FETCH_LATENCY.counterName(nameContext)); + inputWatermarkLag = + cumulativeCounters.longSum( + StreamingPerStageSystemCounterNames.INPUT_WATERMARK_LAG.counterName(nameContext)); + outputWatermarkLag = + cumulativeCounters.longSum( + StreamingPerStageSystemCounterNames.OUTPUT_WATERMARK_LAG.counterName(nameContext)); } List extractCounterUpdates() { @@ -535,6 +618,8 @@ List extractCounterUpdates() { } counterUpdates.addAll( deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE)); + counterUpdates.addAll( + cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); return counterUpdates; } @@ -614,10 +699,17 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( this.sdkHarnessRegistry = sdkHarnessRegistry; this.hotKeyLogger = hotKeyLogger; this.windmillServiceEnabled = options.isEnableStreamingEngine(); - this.memoryMonitor = MemoryMonitor.fromOptions(options); + this.memoryMonitor = + MemoryMonitor.fromOptions( + options, + this.pendingDeltaCounters.longSum( + StreamingSystemCounterNames.MEMORY_MONITOR_NUM_PUSHBACKS.counterName()), + this.pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.MEMORY_MONITOR_IS_THRASHING.counterName())); this.statusPages = WorkerStatusPages.create( DEFAULT_STATUS_PORT, memoryMonitor, sdkHarnessRegistry::sdkHarnessesAreHealthy); + this.workerMetricReceivers = ReflectHelpers.loadServicesOrdered(WorkerMetricsReceiver.class); if (windmillServiceEnabled) { this.debugCaptureManager = new DebugCapture.Manager(options, statusPages.getDebugCapturePages()); @@ -646,14 +738,86 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( this.memoryThrashing = pendingCumulativeCounters.intSum( StreamingSystemCounterNames.MEMORY_THRASHING.counterName()); + this.stateCacheWeight = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_WEIGHT.counterName()); + this.stateCacheMaxWeight = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_MAX_WEIGHT.counterName()); + this.stateCacheSize = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_SIZE.counterName()); + this.stateCacheHits = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_HITS.counterName()); + this.stateCacheRequests = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_REQUESTS.counterName()); + this.stateCacheHitRate = + pendingCumulativeCounters.doubleSum( + StreamingSystemCounterNames.STATE_CACHE_HIT_RATE.counterName()); + this.stateCacheEvictions = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_EVICTIONS.counterName()); + this.stateCacheInvalidateRequests = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_INVALIDATE_REQUESTS.counterName()); + this.stateCacheInvalidatesFromInconsistentToken = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_INVALIDATES_FROM_INCONSISTENT_TOKEN + .counterName()); + this.stateCacheStaleWorkTokenMisses = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.STATE_CACHE_STALE_WORK_TOKEN_MISSES.counterName()); + this.commitDurationMs = + pendingDeltaCounters.distribution( + StreamingSystemCounterNames.COMMIT_DURATION_MS.counterName()); + this.commitSizeBytesPerCommit = + pendingDeltaCounters.distribution( + StreamingSystemCounterNames.COMMIT_SIZE_BYTES_PER_COMMIT.counterName()); + this.commitSizeBytes = + pendingDeltaCounters.longSum(StreamingSystemCounterNames.COMMIT_SIZE_BYTES.counterName()); + this.currentCommitSizeBytes = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.CURRENT_COMMIT_SIZE_BYTES.counterName()); + this.getWorkItemBatchesReceived = + pendingDeltaCounters.longSum( + StreamingSystemCounterNames.GET_WORK_ITEM_BATCHES_RECEIVED.counterName()); + this.workItemsReceived = + pendingDeltaCounters.longSum(StreamingSystemCounterNames.WORK_ITEMS_RECEIVED.counterName()); + this.computationWorkItemsReceived = + pendingDeltaCounters.longSum( + StreamingSystemCounterNames.COMPUTATION_WORK_ITEMS_RECEIVED.counterName()); + this.getWorkItemWaitTimeMs = + pendingDeltaCounters.longSum( + StreamingSystemCounterNames.GET_WORK_ITEM_WAIT_TIME_MS.counterName()); + this.itemsPerStateFetch = + pendingDeltaCounters.distribution( + StreamingSystemCounterNames.STATE_FETCH_BATCH_SIZE.counterName()); this.isDoneFuture = new CompletableFuture<>(); + this.stateCacheMaxWeight.addValue((long) options.getWorkerCacheMb() * 1024 * 1024); + this.threadFactory = r -> { Thread t = new Thread(r); t.setDaemon(true); return t; }; + + ThreadFactory dispatchThreadFactory = + new ThreadFactory() { + private final AtomicInteger threadId = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("DispatchExecutor-" + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + }; + this.workUnitExecutor = new BoundedQueueExecutor( chooseMaximumNumberOfThreads(), @@ -662,7 +826,9 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( chooseMaximumBundlesOutstanding(), chooseMaximumBytesOutstanding(), threadFactory); - + this.dispatchExecutor = + new BoundedQueueExecutor( + 1, THREAD_EXPIRATION_TIME_SEC, TimeUnit.SECONDS, 3, 1000, dispatchThreadFactory); maxSinkBytes = hasExperiment(options, "disable_limiting_bundle_sink_bytes") ? Long.MAX_VALUE @@ -672,42 +838,64 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( memoryMonitorThread.setPriority(Thread.MIN_PRIORITY); memoryMonitorThread.setName("MemoryMonitor"); - dispatchThread = - threadFactory.newThread( - new Runnable() { - @Override - public void run() { - LOG.info("Dispatch starting"); - if (windmillServiceEnabled) { - streamingDispatchLoop(); - } else { - dispatchLoop(); + dispatchThreads = new Thread[options.getNumDispatchThreads()]; + for (int t = 0; t < options.getNumDispatchThreads(); t++) { + Thread dispatchThread = + threadFactory.newThread( + new Runnable() { + @Override + public void run() { + LOG.info("Dispatch starting"); + if (windmillServiceEnabled) { + int sleepTime = ThreadLocalRandom.current().nextInt(1000, 2000); + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + streamingDispatchLoop(); + } else { + dispatchLoop(); + } + LOG.info("Dispatch done"); } - LOG.info("Dispatch done"); - } - }); - dispatchThread.setPriority(Thread.MIN_PRIORITY); - dispatchThread.setName("DispatchThread"); - - commitThread = - threadFactory.newThread( - new Runnable() { - @Override - public void run() { - if (windmillServiceEnabled) { - streamingCommitLoop(); - } else { - commitLoop(); + }); + // dispatchThread.setPriority(Thread.MIN_PRIORITY); + dispatchThread.setName("DispatchThread-" + t); + dispatchThreads[t] = dispatchThread; + } + + int numCommitThreads; + if (windmillServiceEnabled) { + numCommitThreads = 1; + } else { + numCommitThreads = options.getNumCommitStreams(); + } + + commitThreads = new Thread[numCommitThreads]; + for (int i = 0; i < numCommitThreads; i++) { + Thread t = + threadFactory.newThread( + new Runnable() { + @Override + public void run() { + if (windmillServiceEnabled) { + streamingCommitLoop(options.getNumCommitStreams()); + } else { + commitLoop(); + } } - } - }); - commitThread.setPriority(Thread.MAX_PRIORITY); - commitThread.setName("CommitThread"); + }); + t.setPriority(Thread.MAX_PRIORITY); + t.setName("CommitThread-" + i); + commitThreads[i] = t; + } this.publishCounters = publishCounters; this.windmillServer = options.getWindmillServerStub(); this.metricTrackingWindmillServer = - new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled); + new MetricTrackingWindmillServerStub( + windmillServer, + memoryMonitor, + windmillServiceEnabled, + options.getNumGetDataStreams(), + options.getNumGetDataThreads()); this.metricTrackingWindmillServer.start(); this.stateFetcher = new StateFetcher(metricTrackingWindmillServer); this.clientId = clientIdGenerator.nextLong(); @@ -820,14 +1008,29 @@ public boolean workExecutorIsEmpty() { public void start() { running.set(true); + resetWatermarkMetricsTimer = new Timer("ResetWatermarks"); + resetWatermarkMetricsTimer.schedule( + new TimerTask() { + @Override + public void run() { + stageInfoMap.values().forEach(StageInfo::resetStaleWatermarkMetrics); + } + }, + 60_000, + 60_000); + if (windmillServiceEnabled) { // Schedule the background getConfig thread. Blocks until windmillServer stub is ready. schedulePeriodicGlobalConfigRequests(); } memoryMonitorThread.start(); - dispatchThread.start(); - commitThread.start(); + for (Thread dispatchThread : dispatchThreads) { + dispatchThread.start(); + } + for (Thread commitThread : commitThreads) { + commitThread.start(); + } ExecutionStateSampler.instance().start(); // Periodically report workers counters and other updates. @@ -950,12 +1153,17 @@ public void stop() { debugCaptureManager.stop(); } running.set(false); - dispatchThread.interrupt(); - dispatchThread.join(); + for (Thread dispatchThread : dispatchThreads) { + dispatchThread.interrupt(); + dispatchThread.join(); + } // We need to interrupt the commitThread in case it is blocking on pulling // from the commitQueue. - commitThread.interrupt(); - commitThread.join(); + for (Thread commitThread : commitThreads) { + commitThread.interrupt(); + commitThread.join(); + } + memoryMonitor.stop(); memoryMonitorThread.join(); workUnitExecutor.shutdown(); @@ -1038,9 +1246,27 @@ private void dispatchLoop() { } catch (WindmillServerStub.RpcException e) { LOG.warn("GetWork failed, retrying:", e); } + getWorkItemWaitTimeMs.addValue((long) backoff); sleep(backoff); backoff = Math.min(1000, backoff * 2); } while (running.get()); + + dispatchExecutor.execute(new DispatchWorkRunnable(workResponse), MAX_GET_WORK_FETCH_BYTES); + } + } + + private class DispatchWorkRunnable implements Runnable { + private final Windmill.GetWorkResponse workResponse; + + DispatchWorkRunnable(Windmill.GetWorkResponse workResponse) { + this.workResponse = workResponse; + } + + @Override + public void run() { + workItemsReceived.addValue((long) workResponse.getWorkCount()); + getWorkItemBatchesReceived.addValue(1L); + for (final Windmill.ComputationWorkItems computationWork : workResponse.getWorkList()) { final String computationId = computationWork.getComputationId(); final ComputationState computationState = getComputationState(computationId); @@ -1058,6 +1284,8 @@ private void dispatchLoop() { final @Nullable Instant synchronizedProcessingTime = WindmillTimeUtils.windmillToHarnessWatermark( computationWork.getDependentRealtimeInputWatermark()); + + computationWorkItemsReceived.addValue((long) computationWork.getWorkCount()); for (final Windmill.WorkItem workItem : computationWork.getWorkList()) { scheduleWorkItem( computationState, inputDataWatermark, synchronizedProcessingTime, workItem); @@ -1079,12 +1307,10 @@ void streamingDispatchLoop() { Instant inputDataWatermark, Instant synchronizedProcessingTime, Windmill.WorkItem workItem) -> { - memoryMonitor.waitForResources("GetWork"); - scheduleWorkItem( - getComputationState(computation), - inputDataWatermark, - synchronizedProcessingTime, - workItem); + DispatchStreamingWorkRunnable r = + new DispatchStreamingWorkRunnable( + computation, inputDataWatermark, synchronizedProcessingTime, workItem); + dispatchExecutor.execute(r, MAX_GET_WORK_FETCH_BYTES); }); try { // Reconnect every now and again to enable better load balancing. @@ -1099,6 +1325,37 @@ void streamingDispatchLoop() { } } + private class DispatchStreamingWorkRunnable implements Runnable { + private final String computation; + private final @Nullable Instant inputDataWatermark; + private final Instant synchronizedProcessingTime; + private final Windmill.WorkItem workItem; + + DispatchStreamingWorkRunnable( + String computation, + @Nullable Instant inputDataWatermark, + Instant synchronizedProcessingTime, + Windmill.WorkItem workItem) { + this.computation = computation; + this.inputDataWatermark = inputDataWatermark; + this.synchronizedProcessingTime = synchronizedProcessingTime; + this.workItem = workItem; + } + + @Override + public void run() { + memoryMonitor.waitForResources("GetWork"); + getWorkItemBatchesReceived.addValue(1L); + computationWorkItemsReceived.addValue(1L); + + scheduleWorkItem( + getComputationState(computation), + inputDataWatermark, + synchronizedProcessingTime, + workItem); + } + } + private void scheduleWorkItem( final ComputationState computationState, final Instant inputDataWatermark, @@ -1298,7 +1555,22 @@ private void process( StageInfo stageInfo = stageInfoMap.computeIfAbsent( - mapTask.getStageName(), s -> new StageInfo(s, mapTask.getSystemName(), this)); + mapTask.getStageName(), + s -> { + String userName = mapTask.getSystemName(); + if (mapTask.getInstructions() != null) { + ParallelInstruction firstInstruction = + Iterables.getFirst(mapTask.getInstructions(), null); + + if (firstInstruction != null) { + userName = firstInstruction.getName(); + } + } + + publishTopologyUpdate(mapTask.getSystemName(), userName); + + return new StageInfo(s, mapTask.getSystemName(), userName, this); + }); ExecutionState executionState = null; @@ -1401,7 +1673,10 @@ private void process( computationId, key, workItem.getShardingKey(), - workItem.getWorkToken()); + workItem.getWorkToken(), + stageInfo.stateFetches, + itemsPerStateFetch, + stageInfo.stateFetchLatency); StateFetcher localStateFetcher = stateFetcher.byteTrackingView(); // If the read output KVs, then we can decode Windmill's byte key into a userland @@ -1441,6 +1716,8 @@ private void process( localStateFetcher, outputBuilder); + stageInfo.updateWatermarkMetrics(inputDataWatermark, outputDataWatermark); + // Blocks while executing work. executionState.getWorkExecutor().execute(); @@ -1476,6 +1753,8 @@ private void process( } commitQueue.put(new Commit(commitRequest, computationState, work)); + commitSizeBytes.addValue((long) estimatedCommitSize); + commitSizeBytesPerCommit.addValue((long) estimatedCommitSize); // Compute shuffle and state byte statistics these will be flushed asynchronously. long stateBytesWritten = outputBuilder.clearOutputMessages().build().getSerializedSize(); @@ -1600,6 +1879,7 @@ private void commitLoop() { Windmill.CommitWorkRequest.Builder commitRequestBuilder = Windmill.CommitWorkRequest.newBuilder(); long commitBytes = 0; + List commitStartTimes = new ArrayList<>(); // Block until we have a commit, then batch with additional commits. Commit commit = null; try { @@ -1609,6 +1889,7 @@ private void commitLoop() { continue; } while (commit != null) { + commitStartTimes.add(commit.getCommitCreateTime()); ComputationState computationState = commit.getComputationState(); commit.getWork().setState(State.COMMITTING); Windmill.ComputationCommitWorkRequest.Builder computationRequestBuilder = @@ -1632,6 +1913,12 @@ private void commitLoop() { activeCommitBytes.set(commitBytes); windmillServer.commitWork(commitRequest); activeCommitBytes.set(0); + + Instant commitEndTime = Instant.now(); + for (Instant commitStartTime : commitStartTimes) { + commitDurationMs.addValue(commitEndTime.getMillis() - commitStartTime.getMillis()); + } + for (Map.Entry entry : computationRequestMap.entrySet()) { ComputationState computationState = entry.getKey(); @@ -1650,6 +1937,7 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) final ComputationState state = commit.getComputationState(); final Windmill.WorkItemCommitRequest request = commit.getRequest(); final int size = commit.getSize(); + final Instant commitCreateTime = commit.getCommitCreateTime(); commit.getWork().setState(State.COMMITTING); activeCommitBytes.addAndGet(size); if (commitStream.commitWorkItem( @@ -1665,6 +1953,7 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) .invalidate(request.getKey(), request.getShardingKey()); } activeCommitBytes.addAndGet(-size); + commitDurationMs.addValue(Instant.now().getMillis() - commitCreateTime.getMillis()); // This may throw an exception if the commit was not active, which is possible if it // was deemed stuck. state.completeWork( @@ -1704,10 +1993,9 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) { return null; } - private void streamingCommitLoop() { + private void streamingCommitLoop(int numCommitStreams) { StreamPool streamPool = - new StreamPool<>( - NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream); + new StreamPool<>(numCommitStreams, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream); Commit initialCommit = null; while (running.get()) { if (initialCommit == null) { @@ -2016,9 +2304,16 @@ private void updateVMMetrics() { javaHarnessMaxMemory.addValue(maxMemory); } + private void updateCommitQueueStats() { + currentCommitSizeBytes.getAndReset(); + currentCommitSizeBytes.addValue((long) commitQueue.weight()); + } + @VisibleForTesting public void reportPeriodicWorkerUpdates() { updateVMMetrics(); + updateStateCacheStats(); + updateCommitQueueStats(); try { sendWorkerUpdatesToDataflowService(pendingDeltaCounters, pendingCumulativeCounters); } catch (IOException e) { @@ -2043,6 +2338,36 @@ private Object getCounterUpdateKey(CounterUpdate counterUpdate) { return key; } + private void updateStateCacheStats() { + stateCacheWeight.getAndReset(); + stateCacheWeight.addValue(stateCache.getWeight()); + + stateCacheSize.getAndReset(); + stateCacheSize.addValue(stateCache.getSize()); + + stateCacheEvictions.getAndReset(); + stateCacheEvictions.addValue(stateCache.getEvictionCount()); + + stateCacheHitRate.getAndReset(); + stateCacheHitRate.addValue(stateCache.getHitRate() * 100); + + stateCacheHits.getAndReset(); + stateCacheHits.addValue(stateCache.getHits()); + + stateCacheRequests.getAndReset(); + stateCacheRequests.addValue(stateCache.getRequests()); + + stateCacheStaleWorkTokenMisses.getAndReset(); + stateCacheStaleWorkTokenMisses.addValue(stateCache.getStaleWorkTokenMisses()); + + stateCacheInvalidateRequests.getAndReset(); + stateCacheInvalidateRequests.addValue(stateCache.getInvalidateRequests()); + + stateCacheInvalidatesFromInconsistentToken.getAndReset(); + stateCacheInvalidatesFromInconsistentToken.addValue( + stateCache.getInvalidatesFromInconsistentToken()); + } + /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { @@ -2119,6 +2444,8 @@ private void sendWorkerUpdatesToDataflowService( } } + publishCounterUpdates(counterUpdates); + // Handle duplicate counters from different stages. Store all the counters in a multi-map and // send the counters that appear multiple times in separate RPCs. Same logical counter could // appear in multiple stages if a step runs in multiple stages (as with flatten-unzipped stages) @@ -2189,6 +2516,26 @@ private void sendWorkerUpdatesToDataflowService( } } + private void publishCounterUpdates(List updates) { + try { + for (WorkerMetricsReceiver receiver : workerMetricReceivers) { + receiver.receiverCounterUpdates(updates); + } + } catch (Exception e) { + LOG.error("Error publishing counter updates", e); + } + } + + private void publishTopologyUpdate(String systemName, String userName) { + for (WorkerMetricsReceiver receiver : workerMetricReceivers) { + try { + receiver.receiveTopologyUpdate(systemName, userName); + } catch (Exception e) { + LOG.error("Error publishing topology update", e); + } + } + } + /** * Sends a GetData request to Windmill for all sufficiently old active work. * diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java index 74bba59531283..4c31b3d2a90d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Objects; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -72,15 +73,21 @@ public class WindmillStateCache implements StatusDataProvider { new MapMaker().weakValues().concurrencyLevel(4).makeMap(); private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. + private final LongAdder invalidateRequests = new LongAdder(); + private final LongAdder invalidatesFromInconsistentToken = new LongAdder(); + private final LongAdder staleWorkTokenMiss = new LongAdder(); + private final LongAdder cacheHits = new LongAdder(); + private final LongAdder cacheRequests = new LongAdder(); + public WindmillStateCache(long workerCacheMb) { final Weigher weigher = Weighers.weightedKeysAndValues(); workerCacheBytes = workerCacheMb * MEGABYTES; stateCache = CacheBuilder.newBuilder() .maximumWeight(workerCacheBytes) + .concurrencyLevel(Math.max(4, Runtime.getRuntime().availableProcessors())) .recordStats() .weigher(weigher) - .concurrencyLevel(4) .build(); } @@ -111,6 +118,10 @@ public long getWeight() { return w.idWeight + w.entryWeight; } + public long getSize() { + return stateCache.size(); + } + public long getMaxWeight() { return workerCacheBytes; } @@ -163,6 +174,7 @@ public void invalidate(ByteString processingKey, long shardingKey) { // By removing the ForKey object, all state for the key is orphaned in the cache and will // be removed by normal cache cleanup. keyIndex.remove(key); + invalidateRequests.increment(); } /** @@ -241,11 +253,18 @@ public String getStateFamily() { } public @Nullable T get(StateNamespace namespace, StateTag address) { + cacheRequests.increment(); StateId id = new StateId(forKey, stateFamily, namespace); @SuppressWarnings("nullness") // Unsure how to annotate lambda return allowing null. @Nullable StateCacheEntry entry = localCache.computeIfAbsent(id, key -> stateCache.getIfPresent(key)); - return entry == null ? null : entry.get(namespace, address); + + if (entry != null) { + cacheHits.increment(); + return entry.get(namespace, address); + } else { + return null; + } } public void put( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java index 7aed292f1ece0..db8e6a6d43962 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java @@ -2061,7 +2061,7 @@ public void clear() { public Future persist(WindmillStateCache.ForKeyAndFamily cache) throws IOException { if (hasLocalAdditions) { - if (COMPACT_NOW.get().get() || bag.valuesAreCached()) { + if (bag.valuesAreCached()) { // Implicitly clears the bag and combines local and persisted accumulators. localAdditionsAccum = getAccum(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java index 8bae4cd232cc9..c9ab7d4f6a99a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java @@ -40,6 +40,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.WindmillStateReader.StateTag.Kind; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListEntry; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange; @@ -64,6 +66,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ForwardingFuture; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; +import org.joda.time.DateTimeUtils; import org.joda.time.Instant; /** @@ -200,17 +203,27 @@ public ValuesAndContPosition(List values, @Nullable ContinuationT continuatio private long bytesRead = 0L; + private final Counter stateFetches; + private final Counter itemsPerStateFetch; + private final Counter stateFetchLatencyMs; + public WindmillStateReader( MetricTrackingWindmillServerStub server, String computation, ByteString key, long shardingKey, - long workToken) { + long workToken, + Counter stateFetches, + Counter itemsPerStateFetch, + Counter stateFetchLatencyMs) { this.server = server; this.computation = computation; this.key = key; this.shardingKey = shardingKey; this.workToken = workToken; + this.stateFetches = stateFetches; + this.itemsPerStateFetch = itemsPerStateFetch; + this.stateFetchLatencyMs = stateFetchLatencyMs; } private static final class CoderAndFuture { @@ -470,8 +483,16 @@ public void startBatchAndBlock() { return; } + this.stateFetches.addValue(1L); + this.itemsPerStateFetch.addValue((long) toFetch.size()); + + long startTimeMs = DateTimeUtils.currentTimeMillis(); Windmill.KeyedGetDataRequest request = createRequest(toFetch); Windmill.KeyedGetDataResponse response = server.getStateData(computation, request); + + long fetchDurationMs = DateTimeUtils.currentTimeMillis() - startTimeMs; + this.stateFetchLatencyMs.addValue(fetchDurationMs); + if (response == null) { throw new RuntimeException("Windmill unexpectedly returned null for request " + request); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java index 4c37dbc548d65..31b86cc49024d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java @@ -40,6 +40,7 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.dataflow.WorkerMetricsReceiver; import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; @@ -48,6 +49,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -72,6 +74,7 @@ public class WorkItemStatusClient { private final WorkUnitClient workUnitClient; private @Nullable DataflowWorkExecutor worker; private Long nextReportIndex; + private final Iterable workerMetricReceivers; private transient String uniqueWorkId = null; private boolean finalStateSent = false; @@ -91,6 +94,7 @@ public WorkItemStatusClient(WorkUnitClient workUnitClient, WorkItem workItem) { this.workItem = workItem; this.nextReportIndex = checkNotNull(workItem.getInitialReportIndex(), "WorkItem missing initial report index"); + this.workerMetricReceivers = ReflectHelpers.loadServicesOrdered(WorkerMetricsReceiver.class); } public String uniqueWorkId() { @@ -305,6 +309,9 @@ synchronized void populateCounterUpdates(WorkItemStatus status) { // MSec counters reported in worker extractMsecCounters(isFinalUpdate).forEach(appendCounterUpdate); + // Extract memory metrics in worker + extractMemoryCounters().forEach(appendCounterUpdate); + // Metrics reported in SDK runner. // This includes all different kinds of metrics coming from SDK. // Keep in mind that these metrics might contain different types of counter names: @@ -312,6 +319,17 @@ synchronized void populateCounterUpdates(WorkItemStatus status) { worker.extractMetricUpdates().forEach(appendCounterUpdate); status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values())); + publishCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values())); + } + + private void publishCounterUpdates(List updates) { + try { + for (WorkerMetricsReceiver receiver : workerMetricReceivers) { + receiver.receiverCounterUpdates(updates); + } + } catch (Exception e) { + LOG.error("Error publishing counter updates", e); + } } private synchronized Iterable extractCounters(@Nullable CounterSet counters) { @@ -346,6 +364,12 @@ public Iterable extractMsecCounters(boolean isFinalUpdate) { : executionContext.extractMsecCounters(isFinalUpdate); } + public Iterable extractMemoryCounters() { + return executionContext == null + ? Collections.emptyList() + : executionContext.extractMemoryCounters(); + } + public long extractThrottleTime() { return executionContext == null ? 0L : executionContext.extractThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactory.java index e1ae9ba95ea1d..73b329acbbe2a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactory.java @@ -19,16 +19,20 @@ import com.google.auto.value.AutoValue; import java.math.RoundingMode; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramIterationValue; +import org.HdrHistogram.Recorder; import org.apache.beam.runners.dataflow.worker.counters.Counter.AtomicCounterValue; import org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AtomicDouble; @@ -215,34 +219,6 @@ public static CounterDistribution empty() { return EMPTY; } - /** Returns the {@link CounterDistribution} resulting from adding the given value. */ - public final CounterDistribution addValue(long value) { - Preconditions.checkArgument( - value >= 0, "Distribution counters support only non-negative numbers."); - - long min = Math.min(this.getMin(), value); - long max = Math.max(this.getMax(), value); - long count = this.getCount() + 1; - long sum = this.getSum() + value; - // TODO: Replace sum-of-squares with statistics for a better stddev algorithm. - double sumOfSquares = this.getSumOfSquares() + Math.pow(value, 2); - - int bucketIndex = calculateBucket(value); - List buckets = incrementBucket(bucketIndex); - int firstBucketOffset = - this.getBuckets().isEmpty() - ? bucketIndex - : Math.min(this.getFirstBucketOffset(), bucketIndex); - - return CounterDistribution.builder() - .minMax(min, max) - .count(count) - .sum(sum) - .sumOfSquares(sumOfSquares) - .buckets(firstBucketOffset, buckets) - .build(); - } - /** There are 3 buckets for every power of ten: 1, 2, and 5. */ private static final int BUCKETS_PER_10 = 3; @@ -266,51 +242,6 @@ static int calculateBucket(long value) { return 1 + (log10Floor * BUCKETS_PER_10) + bucketOffsetWithinPowerOf10; } - - /** - * Increment the bucket for the given index, and return a new list of buckets. - * - *

If the bucket at the given index is already in the list, this will increment the existing - * value. If the specified index is outside of the current bucket range, the bucket list will be - * extended to incorporate the new bucket. - */ - private List incrementBucket(int bucketIndex) { - int firstBucketOffset = getFirstBucketOffset(); - List curBuckets = getBuckets(); - ImmutableList.Builder newBuckets = ImmutableList.builder(); - - if (getBuckets().isEmpty()) { - // Initial bucket - newBuckets.add(1L); - - } else if (bucketIndex < firstBucketOffset) { - // New prefix bucket - newBuckets.add(1L); - for (int i = bucketIndex + 1; i < firstBucketOffset; i++) { - newBuckets.add(0L); - } - newBuckets.addAll(curBuckets); - - } else if (bucketIndex >= firstBucketOffset + curBuckets.size()) { - // New suffix bucket - newBuckets.addAll(curBuckets); - for (int i = firstBucketOffset + curBuckets.size(); i < bucketIndex; i++) { - newBuckets.add(0L); - } - newBuckets.add(1L); - - } else { - // Value in existing bucket - int curIndex = firstBucketOffset; - for (Long curValue : curBuckets) { - long newValue = (bucketIndex == curIndex) ? curValue + 1 : curValue; - newBuckets.add(newValue); - curIndex++; - } - } - - return newBuckets.build(); - } } private abstract static class BaseCounterValue @@ -846,29 +777,73 @@ public String toString() { /** Implements a {@link Counter} for tracking a distribution of long values. */ public static class DistributionCounterValue extends BaseCounterValue { - // TODO: Using CounterDistribution internally is likely very expensive as each - // update requires copying the buckets list into a new instance. This should be profiled - // and likely optimized to use a mutable internal representation of the value. - private final AtomicReference aggregate = new AtomicReference<>(); + private static final AtomicLongFieldUpdater SUM_UPDATER = + AtomicLongFieldUpdater.newUpdater(DistributionCounterValue.class, "sumValue"); + + private static final AtomicLongFieldUpdater SUM_OF_SQUARES_UPDATER = + AtomicLongFieldUpdater.newUpdater(DistributionCounterValue.class, "sumOfSquares"); + + private final Recorder histogram = new Recorder(2); @Override public void addValue(Long value) { - CounterDistribution current; - CounterDistribution update; - do { - current = aggregate.get(); - update = current.addValue(value); - } while (!aggregate.compareAndSet(current, update)); + histogram.recordValue(value); + SUM_UPDATER.addAndGet(this, value); + SUM_OF_SQUARES_UPDATER.addAndGet(this, (long) Math.pow(value, 2)); + } + + private static CounterDistribution toCounterDistribution( + Histogram v, long sum, double sumOfSquares) { + if (v.getTotalCount() == 0) return CounterDistribution.EMPTY; + + List buckets = new ArrayList<>(); + int firstBucket = -1; + for (HistogramIterationValue r : v.recordedValues()) { + int bucket = CounterDistribution.calculateBucket(r.getValueIteratedTo()); + if (firstBucket == -1) firstBucket = bucket; + + if (bucket >= buckets.size()) { + for (int i = buckets.size(); i <= bucket; i++) { + buckets.add(0L); + } + } + + Long existingValue = buckets.get(bucket); + buckets.set(bucket, existingValue + r.getCountAtValueIteratedTo()); + } + + if (firstBucket > 0) { + buckets = buckets.subList(firstBucket, buckets.size()); + } + + return CounterDistribution.builder() + .min(v.getMinValue()) + .max(v.getMaxValue()) + .count(v.getTotalCount()) + .sum(sum) + .sumOfSquares(sumOfSquares) + .buckets(firstBucket, ImmutableList.copyOf(buckets)) + .build(); } @Override public CounterDistribution getAggregate() { - return aggregate.get(); + throw new UnsupportedOperationException("Cumulative histograms are unsupported"); } @Override public CounterDistribution getAndReset() { - return aggregate.getAndSet(CounterDistribution.empty()); + final Histogram snapshot; + final long sum; + final double sumOfSquares; + + snapshot = histogram.getIntervalHistogram(); + histogram.reset(); + + sum = SUM_UPDATER.getAndSet(this, 0); + sumOfSquares = SUM_OF_SQUARES_UPDATER.getAndSet(this, 0); + + return toCounterDistribution(snapshot, sum, sumOfSquares); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java index 493962e4f325a..b2386dc270dca 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java @@ -41,6 +41,9 @@ import javax.management.ObjectName; import javax.management.ReflectionException; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterName; +import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; @@ -185,6 +188,9 @@ public long totalGCTimeMilliseconds() { private final AtomicDouble maxGCPercentage = new AtomicDouble(0.0); private final AtomicInteger numPushbacks = new AtomicInteger(0); + private final Counter numPushbacksCounter; + private final Counter isThrashingGauge; + /** Wait point for threads in pushback waiting for gc thrashing to pass. */ private final Object waitingForResources = new Object(); @@ -198,7 +204,10 @@ public long totalGCTimeMilliseconds() { private final File localDumpFolder; - public static MemoryMonitor fromOptions(PipelineOptions options) { + public static MemoryMonitor fromOptions( + PipelineOptions options, + Counter numPushbacksCounter, + Counter isThrashingGauge) { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); String uploadToGCSPath = debugOptions.getSaveHeapDumpsToGcsPath(); boolean canDumpHeap = uploadToGCSPath != null || debugOptions.getDumpHeapOnOOM(); @@ -211,7 +220,9 @@ public static MemoryMonitor fromOptions(PipelineOptions options) { canDumpHeap, gcThrashingPercentagePerPeriod, uploadToGCSPath, - getLoggingDir()); + getLoggingDir(), + numPushbacksCounter, + isThrashingGauge); } @VisibleForTesting @@ -223,6 +234,7 @@ static MemoryMonitor forTest( double gcThrashingPercentagePerPeriod, @Nullable String uploadToGCSPath, File localDumpFolder) { + CounterSet noops = new CounterSet(); return new MemoryMonitor( gcStatsProvider, sleepTimeMillis, @@ -230,7 +242,9 @@ static MemoryMonitor forTest( canDumpHeap, gcThrashingPercentagePerPeriod, uploadToGCSPath, - localDumpFolder); + localDumpFolder, + noops.longSum(CounterName.named("numPushbacks")), + noops.longSum(CounterName.named("isThrashing"))); } private MemoryMonitor( @@ -240,7 +254,9 @@ private MemoryMonitor( boolean canDumpHeap, double gcThrashingPercentagePerPeriod, @Nullable String uploadToGCSPath, - File localDumpFolder) { + File localDumpFolder, + Counter numPushbacksCounter, + Counter isThrashingGauge) { this.gcStatsProvider = gcStatsProvider; this.sleepTimeMillis = sleepTimeMillis; this.shutDownAfterNumGCThrashing = shutDownAfterNumGCThrashing; @@ -248,6 +264,8 @@ private MemoryMonitor( this.gcThrashingPercentagePerPeriod = gcThrashingPercentagePerPeriod; this.uploadToGCSPath = uploadToGCSPath; this.localDumpFolder = localDumpFolder; + this.numPushbacksCounter = numPushbacksCounter; + this.isThrashingGauge = isThrashingGauge; } /** For testing only: Wait for the monitor to be running. */ @@ -381,6 +399,9 @@ private void updateIsThrashing() { private void setIsThrashing(boolean serverInGcThrashing) { synchronized (waitingForResources) { synchronized (waitingForStateChange) { + isThrashingGauge.getAndReset(); + isThrashingGauge.addValue(serverInGcThrashing ? 1L : 0L); + boolean prev = isThrashing.getAndSet(serverInGcThrashing); if (prev && !serverInGcThrashing) { waitingForResources.notifyAll(); @@ -508,7 +529,11 @@ public void run() { updateIsThrashing(); if (lastLog < 0 || lastLog + NORMAL_LOGGING_PERIOD_MILLIS < now) { - LOG.info("Memory is {}", describeMemory()); + if (isThrashing.get()) { + LOG.info("Memory is {}", describeMemory()); + } else { + LOG.debug("Memory is {}", describeMemory()); + } lastLog = now; } @@ -540,6 +565,8 @@ public void waitForResources(String context) { return; } numPushbacks.incrementAndGet(); + numPushbacksCounter.addValue(1L); + LOG.info("Waiting for resources for {}. Memory is {}", context, describeMemory()); synchronized (waitingForResources) { boolean interrupted = false; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTrackerTest.java index bdb714dd53d89..f9e3d68a89e8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTrackerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTrackerTest.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; @@ -237,11 +238,11 @@ private CounterDistribution getCounterValue(NameContext step) { /** Build a distribution from the specified values. */ private CounterDistribution distribution(long... values) { - CounterDistribution dist = CounterDistribution.empty(); + CounterFactory.DistributionCounterValue dist = new CounterFactory.DistributionCounterValue(); for (long value : values) { - dist = dist.addValue(value); + dist.addValue(value); } - return dist; + return dist.getAndReset(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java index edcb1915c0473..f928a23c5e5d4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java @@ -26,6 +26,8 @@ import java.util.AbstractMap; import java.util.Map; import java.util.concurrent.Future; +import org.apache.beam.runners.dataflow.worker.counters.CounterName; +import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListEntry; @@ -80,8 +82,17 @@ private static void assertNoReader(Object obj) throws Exception { public void setUp() { MockitoAnnotations.initMocks(this); + CounterSet counterSet = new CounterSet(); underTest = - new WindmillStateReader(mockWindmill, COMPUTATION, DATA_KEY, SHARDING_KEY, WORK_TOKEN); + new WindmillStateReader( + mockWindmill, + COMPUTATION, + DATA_KEY, + SHARDING_KEY, + WORK_TOKEN, + counterSet.longSum(CounterName.named("stateFetches")), + counterSet.distribution(CounterName.named("itemsPerFetch")), + counterSet.distribution(CounterName.named("fetchLatency"))); } private Windmill.Value intValue(int value) throws IOException { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactoryTest.java index 6a13fb291b94a..3c9f2ba2dc2e6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterFactoryTest.java @@ -59,17 +59,19 @@ private void verifyDistributionBucket(long value, int expectedBucket) { @Test public void testCounterDistributionAddValue() { - CounterDistribution counter = CounterDistribution.empty(); + CounterFactory.DistributionCounterValue dcv = new CounterFactory.DistributionCounterValue(); List expectedBuckets = ImmutableList.of(1L, 3L, 0L, 0L, 0L, 0L, 0L, 0L, 1L, 1L); for (long value : new long[] {1, 500, 2, 3, 1000, 4}) { - counter = counter.addValue(value); + dcv.addValue(value); } + CounterDistribution counter = dcv.getAggregate(); assertEquals(expectedBuckets, counter.getBuckets()); assertEquals(1250030.0, counter.getSumOfSquares(), 0); assertEquals(1510, counter.getSum()); assertEquals(1, counter.getFirstBucketOffset()); assertEquals(6, counter.getCount()); assertEquals(1, counter.getMin()); - assertEquals(1000, counter.getMax()); + // using a histogram to track the max causes slight imprecision in the max + assertEquals(1003, counter.getMax()); } }