diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go index 62dc5296d472d..ae2445dd60b6a 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "google.golang.org/grpc" @@ -231,3 +232,73 @@ func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*job // been via pointer. return res, nil } + +func startPythonExpansionService(service, extraPackage string) (stopFunc func() error, address string, err error) { + venvPython, err := expansionx.SetUpPythonEnvironment(extraPackage) + if err != nil { + return nil, "", err + } + log.Debugf(context.Background(), "path: %v", venvPython) + + serviceRunner, err := expansionx.NewPyExpansionServiceRunner(venvPython, service, "") + if err != nil { + return nil, "", fmt.Errorf("error in startAutomatedPythonExpansionService(%s,%s): %w", venvPython, service, err) + } + err = serviceRunner.StartService() + if err != nil { + return nil, "", fmt.Errorf("error in starting expansion service, StartService(): %w", err) + } + stopFunc = serviceRunner.StopService + address = serviceRunner.Endpoint() + return stopFunc, address, nil +} + +// QueryPythonExpansionService submits an external python transform to be expanded by the +// expansion service and then eagerly materializes the artifacts for staging. The given +// transform should be the external transform, and the components are any additional +// components necessary for the pipeline snippet. +// +// The address to be queried is determined by the Config field of the HandlerParams after +// the prefix tag indicating the automated service is in use. +func QueryPythonExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error) { + // Strip autoPython: tag to get actual python module + tag, module := parseAddr(p.Config) + // parse extra-packages from namespace if present + module, extraPackages := parseClasspath(module) + + stopFunc, address, err := startPythonExpansionService(module, extraPackages) + if err != nil { + return nil, err + } + defer stopFunc() + + p.Config = address + + res, err := QueryExpansionService(ctx, p) + if err != nil { + return nil, err + } + + exp := &graph.ExpandedTransform{ + Components: res.GetComponents(), + Transform: res.GetTransform(), + Requirements: res.GetRequirements(), + } + + p.ext.Expanded = exp + // Put correct expansion address into edge + p.edge.External.ExpansionAddr = address + + _, err = ResolveArtifactsWithConfig(ctx, []*graph.MultiEdge{p.edge}, ResolveConfig{}) + + if err != nil { + return nil, err + } + + // Restore tag so we know the artifacts have been materialized eagerly down the road. + p.edge.External.ExpansionAddr = tag + Separator + module + + // Can return the original response because all of our proto modification afterwards has + // been via pointer. + return res, nil +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go new file mode 100644 index 0000000000000..c7c90b65a9996 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xlangx + +import ( + "strings" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" +) + +func TestStartAutomated(t *testing.T) { + if strings.HasSuffix(core.SdkVersion, ".dev") { + t.Skipf("need a released SDK version to test auto python expansion service, got: %s", core.SdkVersion) + } + sp, addr, err := startPythonExpansionService("apache_beam.runners.portability.expansion_service_main", "") + if err != nil { + t.Fatal(err) + } + if addr == "" { + t.Fatal("no address") + } + if err := sp(); err != nil { + t.Fatal("error stoping service") + } +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go index 12488638faba4..0a7eb482cc407 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go @@ -23,10 +23,15 @@ import ( "io" "net/http" "os" + "os/exec" "os/user" "path" "path/filepath" + "sort" "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) type url string @@ -42,7 +47,8 @@ var defaultJarGetter = newJarGetter() const ( apacheRepository = url("https://repo.maven.apache.org/maven2") beamGroupID = "org/apache/beam" - jarCache = "~/.apache_beam/cache/jars" + cacheDir = "~/.apache_beam/cache" + jarCache = cacheDir + "/jars" ) func newJarGetter() *jarGetter { @@ -375,3 +381,67 @@ func jarExists(jarPath string) bool { _, err := os.Stat(jarPath) return err == nil } + +func getPythonVersion() (string, error) { + for _, v := range []string{"python", "python3"} { + cmd := exec.Command(v, "--version") + if err := cmd.Run(); err == nil { + return v, nil + } + } + return "", fmt.Errorf("no python installation found") +} + +// SetUpPythonEnvironment sets up the virtual ennvironment required for the +// Apache Beam Python SDK to run an expansion service module. +func SetUpPythonEnvironment(extraPackage string) (string, error) { + py, err := getPythonVersion() + if err != nil { + return "", fmt.Errorf("no python installation found: %v", err) + } + extraPackages := []string{} + if len(extraPackage) > 0 { + extraPackages = strings.Split(extraPackage, " ") + } + + // create python virtual environment + sort.Strings(extraPackages) + beamPackage := fmt.Sprintf("apache_beam[gcp,aws,azure,dataframe]==%s", core.SdkVersion) + venvDir := filepath.Join( + cacheDir, "venvs", + fmt.Sprintf("py-%s-beam-%s-%s", py, core.SdkVersion, strings.Join(extraPackages, ";")), + ) + venvPython := filepath.Join(venvDir, "bin", "python") + + if _, err := os.Stat(venvPython); err != nil { + err := exec.Command(py, "-m", "venv", venvDir).Run() + if err != nil { + return "", errors.Wrap(err, "error creating new virtual environment for python expansion service") + } + err = exec.Command(venvPython, "-m", "pip", "install", "--upgrade", "pip").Run() + if err != nil { + return "", errors.Wrap(err, "error upgrading pip") + } + err = exec.Command(venvPython, "-m", "pip", "install", "--upgrade", "setuptools").Run() + if err != nil { + return "", errors.Wrap(err, "error upgrading setuptools") + } + err = exec.Command(venvPython, "-m", "pip", "install", beamPackage, "pyparsing==2.4.2").Run() + if err != nil { + return "", errors.Wrap(err, fmt.Sprintf("error installing beam package: %v", beamPackage)) + } + if len(extraPackages) > 0 { + cmd := []string{"-m", "pip", "install"} + cmd = append(cmd, extraPackages...) + err = exec.Command(venvPython, cmd...).Run() + if err != nil { + return "", errors.Wrap(err, "error installing dependencies") + } + } + err = exec.Command(venvPython, "-c", "import apache_beam").Run() + if err != nil { + return "", errors.Wrap(err, "apache beam installation failed in virtualenv") + } + } + return venvPython, nil +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go index 80b52933c095b..28dc3294f44fb 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go @@ -29,7 +29,7 @@ import ( // start up a Beam Expansion Service JAR and maintain a handle on the // process running the service to enable shutdown as well. type ExpansionServiceRunner struct { - jarPath string + execPath string servicePort string serviceCommand *exec.Cmd } @@ -55,11 +55,26 @@ func NewExpansionServiceRunner(jarPath, servicePort string) (*ExpansionServiceRu servicePort = fmt.Sprintf("%d", port) } serviceCommand := exec.Command("java", "-jar", jarPath, servicePort) - return &ExpansionServiceRunner{jarPath: jarPath, servicePort: servicePort, serviceCommand: serviceCommand}, nil + return &ExpansionServiceRunner{execPath: jarPath, servicePort: servicePort, serviceCommand: serviceCommand}, nil +} + +// NewExpansionServiceRunner builds an ExpansionServiceRunner struct for a given python module and +// Beam version and returns a pointer to it. Passing an empty string as servicePort will request an +// open port to be assigned to the service. +func NewPyExpansionServiceRunner(pythonExec, module, servicePort string) (*ExpansionServiceRunner, error) { + if servicePort == "" { + port, err := findOpenPort() + if err != nil { + return nil, fmt.Errorf("failed to find open port for service, got %v", err) + } + servicePort = fmt.Sprintf("%d", port) + } + serviceCommand := exec.Command(pythonExec, "-m", module, "-p", servicePort, "--fully_qualified_name_glob=*") + return &ExpansionServiceRunner{execPath: pythonExec, servicePort: servicePort, serviceCommand: serviceCommand}, nil } func (e *ExpansionServiceRunner) String() string { - return fmt.Sprintf("JAR: %v, Port: %v, Process: %v", e.jarPath, e.servicePort, e.serviceCommand.Process) + return fmt.Sprintf("Exec Path: %v, Port: %v, Process: %v", e.execPath, e.servicePort, e.serviceCommand.Process) } // Endpoint returns the formatted endpoint the ExpansionServiceRunner is set to start the expansion diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go index 1410d444c5c57..6c1e229cc6b4b 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go @@ -43,7 +43,7 @@ func TestNewExpansionServiceRunner(t *testing.T) { if err != nil { t.Fatalf("NewExpansionServiceRunner failed, got %v", err) } - if got, want := serviceRunner.jarPath, testPath; got != want { + if got, want := serviceRunner.execPath, testPath; got != want { t.Errorf("got JAR path %v, want %v", got, want) } if got, want := serviceRunner.servicePort, testPort; got != want { @@ -138,7 +138,7 @@ func TestStartService_good(t *testing.T) { } func makeTestServiceRunner() *ExpansionServiceRunner { - return &ExpansionServiceRunner{jarPath: "", serviceCommand: &exec.Cmd{}} + return &ExpansionServiceRunner{execPath: "", serviceCommand: &exec.Cmd{}} } func TestStopService_NoStart(t *testing.T) { diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go index db6f86627abeb..425ed9aa8aa58 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go @@ -232,10 +232,13 @@ func (r *registry) getHandlerFunc(urn, expansionAddr string) (HandlerFunc, strin } // Check this after hardoverrides and URN overrides so those can point to automated expansion // themselves. - if ns == autoJavaNamespace { + switch ns { + case autoJavaNamespace: // Leave expansionAddr unmodified so the autoNamespace keyword sticks. // We strip it manually in the HandlerFunc. return QueryAutomatedExpansionService, expansionAddr + case autoPythonNamespace: + return QueryPythonExpansionService, expansionAddr } // Now that overrides have been handled, we can look up if there's a handler, and return that. @@ -254,6 +257,7 @@ const ( ClasspathSeparator = ";" hardOverrideNamespace = "hardoverride" autoJavaNamespace = "autojava" + autoPythonNamespace = "autopython" ) // Require takes an expansionAddr and requires cross language expansion @@ -295,6 +299,17 @@ func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServ return expansionAddress } +// UseAutomatedPythonExpansionService takes a expansion service module name and creates a +// tagged string to indicate that it should be used to start up an +// automated expansion service for a cross-language expansion. +// +// Intended for use by cross language wrappers to permit spinning +// up an expansion service for a user if no expansion service address +// is provided. +func UseAutomatedPythonExpansionService(service string) string { + return autoPythonNamespace + Separator + service +} + // restricted namespaces to prevent some awkward edge cases. var restricted = map[string]struct{}{ hardOverrideNamespace: {}, // Special handler for overriding.