Skip to content

Commit

Permalink
[#27839][Go SDK] Write pipeline options to a file, instead reading fr…
Browse files Browse the repository at this point in the history
…om a flag. (#31482)

* [#27839] Move pipeline options file creation to tools package.

* Write options to a file in the container instead of burdening the command line.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Jun 4, 2024
1 parent a5738c1 commit e31e885
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 22 deletions.
4 changes: 3 additions & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func main() {
"--logging_endpoint=" + *loggingEndpoint,
"--control_endpoint=" + *controlEndpoint,
"--semi_persist_dir=" + *semiPersistDir,
"--options=" + options,
}
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
}
if info.GetStatusEndpoint() != nil {
os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
Expand Down
39 changes: 39 additions & 0 deletions sdks/go/container/tools/pipeline_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tools

import (
"fmt"
"os"
)

// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file.
// Assumes the options string is JSON formatted.
//
// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the SDK.
func MakePipelineOptionsFileAndEnvVar(options string) error {
fn := "pipeline_options.json"
f, err := os.Create(fn)
if err != nil {
return fmt.Errorf("unable to create %v: %w", fn, err)
}
defer f.Close()
if _, err := f.WriteString(options); err != nil {
return fmt.Errorf("error writing %v: %w", f.Name(), err)
}
os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
return nil
}
17 changes: 16 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).")
//lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).")
options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).")
options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode). (deprecated)")
)

type exitMode int
Expand Down Expand Up @@ -93,6 +93,21 @@ func hook() {
// will be captured by the framework -- which may not be functional if
// harness.Main returns. We want to be sure any error makes it out.

pipelineOptionsFilename := os.Getenv("PIPELINE_OPTIONS_FILE")
if pipelineOptionsFilename != "" {
if *options != "" {
fmt.Fprintf(os.Stderr, "WARNING: env variable PIPELINE_OPTIONS_FILE set but options flag populated. Potentially bad container loader. Flag value before overwrite: %v\n", options)
}
contents, err := os.ReadFile(pipelineOptionsFilename)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read pipeline options file '%v': %v\n", pipelineOptionsFilename, err)
os.Exit(1)
}
// Overwite flag to be consistent with the legacy flag processing.
*options = string(contents)
}
// Load in pipeline options from the flag string. Used for both the new options file path
// and the older flag approach.
if *options != "" {
var opt runtime.RawOptionsWrapper
if err := json.Unmarshal([]byte(*options), &opt); err != nil {
Expand Down
24 changes: 4 additions & 20 deletions sdks/java/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func main() {
// (3) Invoke the Java harness, preserving artifact ordering in classpath.

os.Setenv("HARNESS_ID", *id)
if err := makePipelineOptionsFile(options); err != nil {
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
}
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
Expand Down Expand Up @@ -247,29 +247,13 @@ func main() {
logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
}

// makePipelineOptionsFile writes the pipeline options to a file.
// Assumes the options string is JSON formatted.
func makePipelineOptionsFile(options string) error {
fn := "pipeline_options.json"
f, err := os.Create(fn)
if err != nil {
return fmt.Errorf("unable to create %v: %w", fn, err)
}
defer f.Close()
if _, err := f.WriteString(options); err != nil {
return fmt.Errorf("error writing %v: %w", f.Name(), err)
}
os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
return nil
}

// heapSizeLimit returns 80% of the runner limit, if provided. If not provided,
// it returns 70% of the physical memory on the machine. If it cannot determine
// that value, it returns 1GB. This is an imperfect heuristic. It aims to
// ensure there is memory for non-heap use and other overhead, while also not
// underutilizing the machine. if set_recommended_max_xmx experiment is enabled,
// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops
// utilizes memory more efficiently, and has positive impact on GC performance
// underutilizing the machine. if set_recommended_max_xmx experiment is enabled,
// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops
// utilizes memory more efficiently, and has positive impact on GC performance
// and cache hit rate.
func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64 {
if setRecommendedMaxXmx {
Expand Down

0 comments on commit e31e885

Please sign in to comment.