From e5fac573eed60e8ec1ec31ccad50423f2da27f92 Mon Sep 17 00:00:00 2001 From: Irene Onyeneho Date: Thu, 18 Mar 2021 09:11:49 -0700 Subject: [PATCH] Adds AVRO_PARSER_NUM_MINIBATCH to override num_minibatches and logs the parsing time (#1283) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues -Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls * Bump Apache Arrow to 2.0.0 (#1231) * Bump Apache Arrow to 2.0.0 Also bumps Apache Thrift to 0.13.0 Signed-off-by: Yong Tang * Update code to match Arrow Signed-off-by: Yong Tang * Bump pyarrow to 2.0.0 Signed-off-by: Yong Tang * Stay with version=1 for write_feather to pass tests Signed-off-by: Yong Tang * Bump flatbuffers to 1.12.0 Signed-off-by: Yong Tang * Fix Windows issue Signed-off-by: Yong Tang * Fix tests Signed-off-by: Yong Tang * Fix Windows Signed-off-by: Yong Tang * Remove -std=c++11 and leave default -std=c++14 for arrow build Signed-off-by: Yong Tang * Update sha256 of libapr1 As the hash changed by the repo. Signed-off-by: Yong Tang * Add emulator for gcs (#1234) * Bump com_github_googleapis_google_cloud_cpp to `1.21.0` * Add gcs testbench * Bump `libcurl` to `7.69.1` * Remove the CI build for CentOS 8 (#1237) Building shared libraries on CentOS 8 is pretty much the same as on Ubuntu 20.04 except `apt` should be changed to `yum`. For that our CentOS 8 CI test is not adding a lot of value. Furthermore with the upcoming CentOS 8 change: https://www.phoronix.com/scan.php?page=news_item&px=CentOS-8-Ending-For-Stream CentOS 8 is effectively EOLed at 2021. For that we may want to drop the CentOS 8 build (only leave a comment in README.md) Note we keep CentOS 7 build for now as there are still many users using CentOS 7 and CentOS 7 will only be EOLed at 2024. We might drop CentOS 7 build in the future as well if there is similiar changes to CentOS 7 like CentOS 8. Signed-off-by: Yong Tang * add tf-c-header rule (#1244) * Skip tf-nightly:tensorflow-io==0.17.0 on API compatibility test (#1247) Signed-off-by: Yong Tang * [s3] add support for testing on macOS (#1253) * [s3] add support for testing on macOS * modify docker-compose cmd * add notebook formatting instruction in README (#1256) * [docs] Restructure README.md content (#1257) * Refactor README.md content * bump to run ci jobs * Update libtiff/libgeotiff dependency (#1258) This PR updates libtiff/libgeotiff to the latest version. Signed-off-by: Yong Tang * remove unstable elasticsearch test setup on macOS (#1263) * Exposes num_parallel_reads and num_parallel_calls (#1232) -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues - Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls Co-authored-by: Abin Shahab * Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches. This is recommended to be set equal to the vcore request. * Exposes num_parallel_reads and num_parallel_calls (#1232) * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Fixes Lint Issues * Removes Optional typing for method parameter - * Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls * Uncomments skip for macOS pytests * Fixes Lint issues Co-authored-by: Abin Shahab * add avro tutorial testing data (#1267) Co-authored-by: Cheng Ren <1428327+chengren311@users.noreply.github.com> * Update Kafka tutorial to work with Apache Kafka (#1266) * Update Kafka tutorial to work with Apache Kafka Minor update to the Kafka tutorial to remove the dependency on Confluent's distribution of Kafka, and instead work with vanilla Apache Kafka. Signed-off-by: Dale Lane * Address review comments Remove redundant pip install commands Signed-off-by: Dale Lane * add github workflow for performance benchmarking (#1269) * add github workflow for performance benchmarking * add github-action-benchmark step * handle missing dependencies while benchmarking (#1271) * handle missing dependencies while benchmarking * setup test_sql * job name change * set auto-push to true * remove auto-push * add personal access token * use alternate method to push to gh-pages * add name to the action * use different id * modify creds * use github_token * change repo name * set auto-push * set origin and push results * set env * use PERSONAL_GITHUB_TOKEN * use push changes action * use github.head_ref to push the changes * try using fetch-depth * modify branch name * use alternative push approach * git switch - * test by merging with forked master * Disable s3 macOS for now as docker is not working on GitHub Actions for macOS (#1277) * Revert "[s3] add support for testing on macOS (#1253)" This reverts commit 81789bde99e62523ca4d9f460bb345c666902acd. Signed-off-by: Yong Tang * Update Signed-off-by: Yong Tang * rename testing data files (#1278) * Add tutorial for avro dataset API (#1250) * remove docker based mongodb tests in macos (#1279) * trigger benchmarks workflow only on commits (#1282) * Bump Apache Arrow to 3.0.0 (#1285) Signed-off-by: Yong Tang * Add bazel cache (#1287) Signed-off-by: Yong Tang * Add initial bigtable stub test (#1286) * Add initial bigtable stub test Signed-off-by: Yong Tang * Fix kokoro test Signed-off-by: Yong Tang * Add reference to github-pages benchmarks in README (#1289) * add reference to github-pages benchmarks * minor grammar change * Update README.md Co-authored-by: Yuan Tang Co-authored-by: Yuan Tang * Clear outputs (#1292) * fix kafka online-learning section in tutorial notebook (#1274) * kafka notebook fix for colab env * change timeout from 30 to 20 seconds * reduce stream_timeout * Only enable bazel caching writes for tensorflow/io github actions (#1293) This PR updates so that only GitHub actions run on tensorflow/io repo will be enabled with bazel cache writes. Without the updates, a focked repo actions will cause error. Note once bazel cache read-permissions are enabled from gcs forked repo will be able to access bazel cache (read-only). Signed-off-by: Yong Tang * Enable ready-only bazel cache (#1294) This PR enables read-only bazel cache Signed-off-by: Yong Tang * Rename tests (#1297) * Combine Ubuntu 20.04 and CentOS 7 tests into one GitHub jobs (#1299) When GitHub Actions runs it looks like there is an implicit concurrent jobs limit. As such the CentOS 7 test normally is scheduled later after other jobs completes. However, many times CentOS 7 test hangs (e.g., https://github.com/tensorflow/io/runs/1825943449). This is likely due to the CentOS 7 test is on the GitHub Actions queue for too long. This PR moves CentOS 7 to run after Ubuntu 20.04 test complete, to try to avoid hangs. Signed-off-by: Yong Tang * Update names of api tests (#1300) We renamed the tests to remove "_eager" parts. This PR updates the api test for correct filenames Signed-off-by: Yong Tang * Fix wrong benchmark tests names (#1301) Fixes wrong benchmark tests names caused by last commit Signed-off-by: Yong Tang * Patch arrow to temporarily resolve the ARROW-11518 issue (#1304) This PR patchs arrow to temporarily resolve the ARROW-11518 issue. See 1281 for details Credit to diggerk. We will update arrow after the upstream PR is merged. Signed-off-by: Yong Tang * Remove AWS headers from tensorflow, and use headers from third_party … (#1241) * Remove external headers from tensorflow, and use third_party headers instead This PR removes external headers from tensorflow, and use third_party headers instead. Signed-off-by: Yong Tang * Address review comment Signed-off-by: Yong Tang * Switch to use github to download libgeotiff (#1307) Signed-off-by: Yong Tang * Add @com_google_absl//absl/strings:cord (#1308) Fix read/STDIN_FILENO Signed-off-by: Yong Tang * Switch to modular file system for hdfs (#1309) * Switch to modular file system for hdfs This PR is part of the effort to switch to modular file system for hdfs. When TF_ENABLE_LEGACY_FILESYSTEM=1 is provided, old behavior will be preserved. Signed-off-by: Yong Tang * Build against tf-nightly Signed-off-by: Yong Tang * Update tests Signed-off-by: Yong Tang * Adjust the if else logic, follow review comment Signed-off-by: Yong Tang * Disable test_write_kafka test for now. (#1310) With tensorflow upgrade to tf-nightly, the test_write_kafka test is failing and that is block the plan to modular file system migration. This PR disables the test temporarily so that CI can continue to push tensorflow-io-nightly image (needed for modular file system migration) Signed-off-by: Yong Tang * Switch to modular file system for s3 (#1312) This PR is part of the effort to switch to modular file system for s3. When TF_ENABLE_LEGACY_FILESYSTEM=1 is provided, old behavior will be preserved. Signed-off-by: Yong Tang * Add python 3.9 on Windows (#1316) * Updates the PR to use attribute instead of Env Variable -Originally AVRO_PARSER_NUM_MINIBATCH was set as an environmental variable. Because tensorflow-io rarely uses env vars to fine tune kernal ops this was changed to an attribute. See comment here: https://github.com/tensorflow/io/pull/1283#issuecomment-767747791 * Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches. This is recommended to be set equal to the vcore request. * Updates the PR to use attribute instead of Env Variable -Originally AVRO_PARSER_NUM_MINIBATCH was set as an environmental variable. Because tensorflow-io rarely uses env vars to fine tune kernal ops this was changed to an attribute. See comment here: https://github.com/tensorflow/io/pull/1283#issuecomment-767747791 * Adds addtional comments in source code for understandability Co-authored-by: Abin Shahab Co-authored-by: Yong Tang Co-authored-by: Vo Van Nghia Co-authored-by: Vignesh Kothapalli Co-authored-by: Cheng Ren Co-authored-by: Cheng Ren <1428327+chengren311@users.noreply.github.com> Co-authored-by: Dale Lane Co-authored-by: Yuan Tang Co-authored-by: Mark Daoust --- .github/workflows/build.yml | 15 +++++++ .../core/kernels/avro/parse_avro_kernels.cc | 45 +++++++++++++++---- .../kernels/avro/utils/avro_parser_tree.cc | 20 ++++++++- tensorflow_io/core/ops/avro_ops.cc | 4 ++ .../python/experimental/parse_avro_ops.py | 2 + 5 files changed, 76 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 54a8e5ead..f3e8c8cff 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -88,6 +88,21 @@ jobs: docker run -i --rm -v $PWD:/v -w /v --net=host \ -e BAZEL_OPTIMIZATION="${BAZEL_OPTIMIZATION}" \ ubuntu:20.04 bash -x -e source.sh + - name: CentOS 7 + run: | + if [[ "${EVENT_NAME}" == "push" && "${REPO_NAME}" == "tensorflow/io" ]]; then + printf '%s\n' "${GCP_CREDS}" >service_account_creds.json + export BAZEL_OPTIMIZATION="--remote_cache=https://storage.googleapis.com/tensorflow-sigs-io --remote_upload_local_results=true --google_credentials=service_account_creds.json" + else + export BAZEL_OPTIMIZATION="--remote_cache=https://storage.googleapis.com/tensorflow-sigs-io --remote_upload_local_results=false" + fi + set -x -e + bash -x -e .github/workflows/build.space.sh + python3 .github/workflows/build.instruction.py docs/development.md "##### CentOS 7" > source.sh + cat source.sh + docker run -i --rm -v $PWD:/v -w /v --net=host \ + -e BAZEL_OPTIMIZATION="${BAZEL_OPTIMIZATION}" \ + centos:7 bash -x -e source.sh macos-bazel: name: Bazel macOS diff --git a/tensorflow_io/core/kernels/avro/parse_avro_kernels.cc b/tensorflow_io/core/kernels/avro/parse_avro_kernels.cc index 6eecf841c..156d6c490 100644 --- a/tensorflow_io/core/kernels/avro/parse_avro_kernels.cc +++ b/tensorflow_io/core/kernels/avro/parse_avro_kernels.cc @@ -172,7 +172,9 @@ Status ParseAvro(const AvroParserConfig& config, const gtl::ArraySlice& serialized, thread::ThreadPool* thread_pool, AvroResult* result) { DCHECK(result != nullptr); - + using clock = std::chrono::system_clock; + using ms = std::chrono::duration; + const auto before = clock::now(); // Allocate dense output for fixed length dense values // (variable-length dense and sparse and ragged have to be buffered). /* std::vector fixed_len_dense_values(config.dense.size()); @@ -189,6 +191,10 @@ Status ParseAvro(const AvroParserConfig& config, // This parameter affects performance in a big and data-dependent way. const size_t kMiniBatchSizeBytes = 50000; + // avro_num_minibatches_ is int64 in the op interface. If not set + // the default value is 0. + size_t avro_num_minibatches_; + // Calculate number of minibatches. // In main regime make each minibatch around kMiniBatchSizeBytes bytes. // Apply 'special logic' below for small and big regimes. @@ -204,8 +210,13 @@ Status ParseAvro(const AvroParserConfig& config, minibatch_bytes = 0; } } - // 'special logic' - const size_t min_minibatches = std::min(8, serialized.size()); + if (avro_num_minibatches_) { + VLOG(5) << "Overriding num_minibatches with " << avro_num_minibatches_; + result = avro_num_minibatches_; + } + // This is to ensure users can control the num minibatches all the way down + // to size of 1(no parallelism). + const size_t min_minibatches = std::min(1, serialized.size()); const size_t max_minibatches = 64; return std::max(min_minibatches, std::min(max_minibatches, result)); @@ -245,13 +256,16 @@ Status ParseAvro(const AvroParserConfig& config, auto read_value = [&](avro::GenericDatum& d) { return range_reader.read(d); }; - + VLOG(5) << "Processing minibatch " << minibatch; status_of_minibatch[minibatch] = parser_tree.ParseValues( &buffers[minibatch], read_value, reader_schema, defaults); }; - + const auto before_parse = clock::now(); ParallelFor(ProcessMiniBatch, num_minibatches, thread_pool); - + const auto after_parse = clock::now(); + const ms parse_read_duration = after_parse - before_parse; + VLOG(5) << "PARSER_TIMING: Time spend reading and parsing " + << parse_read_duration.count() << " ms "; for (Status& status : status_of_minibatch) { TF_RETURN_IF_ERROR(status); } @@ -367,15 +381,22 @@ Status ParseAvro(const AvroParserConfig& config, return Status::OK(); }; - + const auto before_sparse_merge = clock::now(); for (size_t d = 0; d < config.sparse.size(); ++d) { TF_RETURN_IF_ERROR(MergeSparseMinibatches(d)); } - + const auto after_sparse_merge = clock::now(); + const ms s_merge_duration = after_sparse_merge - before_sparse_merge; for (size_t d = 0; d < config.dense.size(); ++d) { TF_RETURN_IF_ERROR(MergeDenseMinibatches(d)); } + const auto after_dense_merge = clock::now(); + const ms d_merge_duration = after_dense_merge - after_sparse_merge; + VLOG(5) << "PARSER_TIMING: Sparse merge duration" << s_merge_duration.count() + << " ms "; + VLOG(5) << "PARSER_TIMING: Dense merge duration" << d_merge_duration.count() + << " ms "; return Status::OK(); } @@ -388,6 +409,8 @@ class ParseAvroOp : public OpKernel { OP_REQUIRES_OK(ctx, ctx->GetAttr("sparse_types", &sparse_types_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_types", &dense_types_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_shapes", &dense_shapes_)); + OP_REQUIRES_OK( + ctx, ctx->GetAttr("avro_num_minibatches", &avro_num_minibatches_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("sparse_keys", &sparse_keys_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_keys", &dense_keys_)); @@ -401,6 +424,11 @@ class ParseAvroOp : public OpKernel { dense_shapes_[d].dims() > 1 && dense_shapes_[d].dim_size(0) == -1; } + // Check that avro_num_minibatches is not negative + OP_REQUIRES(ctx, avro_num_minibatches_ >= 0, + errors::InvalidArgument("Need avro_num_minibatches >= 0, got ", + avro_num_minibatches_)); + string reader_schema_str; OP_REQUIRES_OK(ctx, ctx->GetAttr("reader_schema", &reader_schema_str)); @@ -495,6 +523,7 @@ class ParseAvroOp : public OpKernel { avro::ValidSchema reader_schema_; size_t num_dense_; size_t num_sparse_; + int64 avro_num_minibatches_; private: std::vector> CreateKeysAndTypes() { diff --git a/tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc b/tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc index b1b12573e..b8d5616d5 100644 --- a/tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc +++ b/tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc @@ -81,6 +81,9 @@ Status AvroParserTree::ParseValues( const std::function read_value, const avro::ValidSchema& reader_schema, const std::map& defaults) const { + using clock = std::chrono::system_clock; + using ms = std::chrono::duration; + // new assignment of all buffers TF_RETURN_IF_ERROR(InitializeValueBuffers(key_to_value)); @@ -90,11 +93,24 @@ Status AvroParserTree::ParseValues( avro::GenericDatum datum(reader_schema); bool has_value = false; - - while ((has_value = read_value(datum))) { + ms parse_duration; + ms read_duration; + while (true) { + const auto before_read = clock::now(); + if (!(has_value = read_value(datum))) { + break; + } + const auto after_read = clock::now(); TF_RETURN_IF_ERROR((*root_).Parse(key_to_value, datum, defaults)); + const auto after_parse = clock::now(); + parse_duration += after_parse - after_read; + read_duration += after_read - before_read; } + VLOG(5) << "PARSER_TIMING: Avro Read times " << read_duration.count() + << " ms "; + VLOG(5) << "PARSER_TIMING: Avro Parse times " << parse_duration.count() + << " ms "; // add end marks to all buffers for batch TF_RETURN_IF_ERROR(AddFinishMarks(key_to_value)); diff --git a/tensorflow_io/core/ops/avro_ops.cc b/tensorflow_io/core/ops/avro_ops.cc index 2432292a0..34a8b19fb 100644 --- a/tensorflow_io/core/ops/avro_ops.cc +++ b/tensorflow_io/core/ops/avro_ops.cc @@ -83,6 +83,7 @@ REGISTER_OP("IO>ParseAvro") .Output("sparse_values: sparse_types") .Output("sparse_shapes: num_sparse * int64") .Output("dense_values: dense_types") + .Attr("avro_num_minibatches: int >= 0") .Attr("num_sparse: int >= 0") .Attr("reader_schema: string") .Attr("sparse_keys: list(string) >= 0") @@ -94,6 +95,7 @@ REGISTER_OP("IO>ParseAvro") .SetShapeFn([](shape_inference::InferenceContext* c) { size_t num_dense; size_t num_sparse; + int64 avro_num_minibatches; int64 num_sparse_from_user; std::vector sparse_types; std::vector dense_types; @@ -106,6 +108,8 @@ REGISTER_OP("IO>ParseAvro") TF_RETURN_IF_ERROR(c->GetAttr("sparse_types", &sparse_types)); TF_RETURN_IF_ERROR(c->GetAttr("dense_types", &dense_types)); TF_RETURN_IF_ERROR(c->GetAttr("dense_shapes", &dense_shapes)); + TF_RETURN_IF_ERROR( + c->GetAttr("avro_num_minibatches", &avro_num_minibatches)); TF_RETURN_IF_ERROR(c->GetAttr("sparse_keys", &sparse_keys)); TF_RETURN_IF_ERROR(c->GetAttr("sparse_ranks", &sparse_ranks)); diff --git a/tensorflow_io/core/python/experimental/parse_avro_ops.py b/tensorflow_io/core/python/experimental/parse_avro_ops.py index edfbbee79..341548435 100644 --- a/tensorflow_io/core/python/experimental/parse_avro_ops.py +++ b/tensorflow_io/core/python/experimental/parse_avro_ops.py @@ -130,6 +130,7 @@ def _parse_avro( dense_defaults=None, dense_shapes=None, name=None, + avro_num_minibatches=0, ): """Parses Avro records. @@ -196,6 +197,7 @@ def _parse_avro( dense_keys=dense_keys, dense_shapes=dense_shapes, name=name, + avro_num_minibatches=avro_num_minibatches, ) (sparse_indices, sparse_values, sparse_shapes, dense_values) = outputs