From 42946a60c429754fb39cdafaac4c76f00315a0ef Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 11 Oct 2022 15:59:33 -0400 Subject: [PATCH 1/6] automated python expansion service --- .../go/pkg/beam/core/runtime/xlangx/expand.go | 71 +++++++++++++++++++ .../beam/core/runtime/xlangx/expand_test.go | 33 +++++++++ .../runtime/xlangx/expansionx/download.go | 70 ++++++++++++++++++ .../core/runtime/xlangx/expansionx/process.go | 21 +++++- .../runtime/xlangx/expansionx/process_test.go | 4 +- .../pkg/beam/core/runtime/xlangx/registry.go | 15 ++++ 6 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go index 62dc5296d472d..ae2445dd60b6a 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "google.golang.org/grpc" @@ -231,3 +232,73 @@ func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*job // been via pointer. return res, nil } + +func startPythonExpansionService(service, extraPackage string) (stopFunc func() error, address string, err error) { + venvPython, err := expansionx.SetUpPythonEnvironment(extraPackage) + if err != nil { + return nil, "", err + } + log.Debugf(context.Background(), "path: %v", venvPython) + + serviceRunner, err := expansionx.NewPyExpansionServiceRunner(venvPython, service, "") + if err != nil { + return nil, "", fmt.Errorf("error in startAutomatedPythonExpansionService(%s,%s): %w", venvPython, service, err) + } + err = serviceRunner.StartService() + if err != nil { + return nil, "", fmt.Errorf("error in starting expansion service, StartService(): %w", err) + } + stopFunc = serviceRunner.StopService + address = serviceRunner.Endpoint() + return stopFunc, address, nil +} + +// QueryPythonExpansionService submits an external python transform to be expanded by the +// expansion service and then eagerly materializes the artifacts for staging. The given +// transform should be the external transform, and the components are any additional +// components necessary for the pipeline snippet. +// +// The address to be queried is determined by the Config field of the HandlerParams after +// the prefix tag indicating the automated service is in use. +func QueryPythonExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error) { + // Strip autoPython: tag to get actual python module + tag, module := parseAddr(p.Config) + // parse extra-packages from namespace if present + module, extraPackages := parseClasspath(module) + + stopFunc, address, err := startPythonExpansionService(module, extraPackages) + if err != nil { + return nil, err + } + defer stopFunc() + + p.Config = address + + res, err := QueryExpansionService(ctx, p) + if err != nil { + return nil, err + } + + exp := &graph.ExpandedTransform{ + Components: res.GetComponents(), + Transform: res.GetTransform(), + Requirements: res.GetRequirements(), + } + + p.ext.Expanded = exp + // Put correct expansion address into edge + p.edge.External.ExpansionAddr = address + + _, err = ResolveArtifactsWithConfig(ctx, []*graph.MultiEdge{p.edge}, ResolveConfig{}) + + if err != nil { + return nil, err + } + + // Restore tag so we know the artifacts have been materialized eagerly down the road. + p.edge.External.ExpansionAddr = tag + Separator + module + + // Can return the original response because all of our proto modification afterwards has + // been via pointer. + return res, nil +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go new file mode 100644 index 0000000000000..f91bafa4ce148 --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xlangx + +import ( + "testing" +) + +func TestStartAutomated(t *testing.T) { + sp, addr, err := startPythonExpansionService("apache_beam.runners.portability.expansion_service_main", "") + if err != nil { + t.Fatal(err) + } + if addr == "" { + t.Fatal("no address") + } + if err := sp(); err != nil { + t.Fatal("error stoping service") + } +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go index 12488638faba4..72714862e42ad 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go @@ -23,10 +23,15 @@ import ( "io" "net/http" "os" + "os/exec" "os/user" "path" "path/filepath" + "sort" "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) type url string @@ -375,3 +380,68 @@ func jarExists(jarPath string) bool { _, err := os.Stat(jarPath) return err == nil } + +func getPythonVersion() (string, error) { + for _, v := range []string{"python", "python3"} { + cmd := exec.Command(v, "--version") + if err := cmd.Run(); err == nil { + return v, nil + } + } + return "", fmt.Errorf("no python installation found") +} + +// SetUpPythonEnvironment sets up the virtual ennvironment required for the +// Apache Beam Python SDK to run an expansion service module. +func SetUpPythonEnvironment(extraPackage string) (string, error) { + py, err := getPythonVersion() + if err != nil { + return "", fmt.Errorf("no python installation found: %v", err) + } + extraPackages := []string{} + if len(extraPackage) > 0 { + extraPackages = strings.Split(extraPackage, " ") + } + + // create python virtual environment + sort.Strings(extraPackages) + beamPackage := fmt.Sprintf("apache_beam[gcp,aws,asure,dataframe]==%s", core.SdkVersion) + cacheDir := "~/.apache_beam/cache" + venvDir := filepath.Join( + cacheDir, "venvs", + fmt.Sprintf("py-%s-beam-%s-%s", py, core.SdkVersion, strings.Join(extraPackages, ";")), + ) + venvPython := filepath.Join(venvDir, "bin", "python") + + if _, err := os.Stat(venvPython); err != nil { + err := exec.Command(py, "-m", "venv", venvDir).Run() + if err != nil { + return "", errors.Wrap(err, "error creating new virtual environment for python expansion service") + } + err = exec.Command(venvPython, "-m", "pip", "install", "--upgrade", "pip").Run() + if err != nil { + return "", errors.Wrap(err, "error upgrading pip") + } + err = exec.Command(venvPython, "-m", "pip", "install", "--upgrade", "setuptools").Run() + if err != nil { + return "", errors.Wrap(err, "error upgrading setuptools") + } + err = exec.Command(venvPython, "-m", "pip", "install", beamPackage, "pyparsing==2.4.2").Run() + if err != nil { + return "", errors.Wrap(err, fmt.Sprintf("error installing beam package: %v", beamPackage)) + } + if len(extraPackages) > 0 { + cmd := []string{"-m", "pip", "install"} + cmd = append(cmd, extraPackages...) + err = exec.Command(venvPython, cmd...).Run() + if err != nil { + return "", errors.Wrap(err, "error installing dependencies") + } + } + err = exec.Command(venvPython, "-c", "import apache_beam").Run() + if err != nil { + return "", errors.Wrap(err, "apache beam installation failed in virtualenv") + } + } + return venvPython, nil +} diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go index 80b52933c095b..28dc3294f44fb 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go @@ -29,7 +29,7 @@ import ( // start up a Beam Expansion Service JAR and maintain a handle on the // process running the service to enable shutdown as well. type ExpansionServiceRunner struct { - jarPath string + execPath string servicePort string serviceCommand *exec.Cmd } @@ -55,11 +55,26 @@ func NewExpansionServiceRunner(jarPath, servicePort string) (*ExpansionServiceRu servicePort = fmt.Sprintf("%d", port) } serviceCommand := exec.Command("java", "-jar", jarPath, servicePort) - return &ExpansionServiceRunner{jarPath: jarPath, servicePort: servicePort, serviceCommand: serviceCommand}, nil + return &ExpansionServiceRunner{execPath: jarPath, servicePort: servicePort, serviceCommand: serviceCommand}, nil +} + +// NewExpansionServiceRunner builds an ExpansionServiceRunner struct for a given python module and +// Beam version and returns a pointer to it. Passing an empty string as servicePort will request an +// open port to be assigned to the service. +func NewPyExpansionServiceRunner(pythonExec, module, servicePort string) (*ExpansionServiceRunner, error) { + if servicePort == "" { + port, err := findOpenPort() + if err != nil { + return nil, fmt.Errorf("failed to find open port for service, got %v", err) + } + servicePort = fmt.Sprintf("%d", port) + } + serviceCommand := exec.Command(pythonExec, "-m", module, "-p", servicePort, "--fully_qualified_name_glob=*") + return &ExpansionServiceRunner{execPath: pythonExec, servicePort: servicePort, serviceCommand: serviceCommand}, nil } func (e *ExpansionServiceRunner) String() string { - return fmt.Sprintf("JAR: %v, Port: %v, Process: %v", e.jarPath, e.servicePort, e.serviceCommand.Process) + return fmt.Sprintf("Exec Path: %v, Port: %v, Process: %v", e.execPath, e.servicePort, e.serviceCommand.Process) } // Endpoint returns the formatted endpoint the ExpansionServiceRunner is set to start the expansion diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go index 1410d444c5c57..6c1e229cc6b4b 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process_test.go @@ -43,7 +43,7 @@ func TestNewExpansionServiceRunner(t *testing.T) { if err != nil { t.Fatalf("NewExpansionServiceRunner failed, got %v", err) } - if got, want := serviceRunner.jarPath, testPath; got != want { + if got, want := serviceRunner.execPath, testPath; got != want { t.Errorf("got JAR path %v, want %v", got, want) } if got, want := serviceRunner.servicePort, testPort; got != want { @@ -138,7 +138,7 @@ func TestStartService_good(t *testing.T) { } func makeTestServiceRunner() *ExpansionServiceRunner { - return &ExpansionServiceRunner{jarPath: "", serviceCommand: &exec.Cmd{}} + return &ExpansionServiceRunner{execPath: "", serviceCommand: &exec.Cmd{}} } func TestStopService_NoStart(t *testing.T) { diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go index db6f86627abeb..92ab8e03c4174 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go @@ -236,6 +236,8 @@ func (r *registry) getHandlerFunc(urn, expansionAddr string) (HandlerFunc, strin // Leave expansionAddr unmodified so the autoNamespace keyword sticks. // We strip it manually in the HandlerFunc. return QueryAutomatedExpansionService, expansionAddr + } else if ns == autoPythonNamespace { + return QueryPythonExpansionService, expansionAddr } // Now that overrides have been handled, we can look up if there's a handler, and return that. @@ -254,6 +256,7 @@ const ( ClasspathSeparator = ";" hardOverrideNamespace = "hardoverride" autoJavaNamespace = "autojava" + autoPythonNamespace = "autopython" ) // Require takes an expansionAddr and requires cross language expansion @@ -295,6 +298,18 @@ func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServ return expansionAddress } +// UseAutomatedPythonExpansionService takes a expansion service module name and creates a +// tagged string to indicate that it should be used to start up an +// automated expansion service for a cross-language expansion. +// +// Intended for use by cross language wrappers to permit spinning +// up an expansion service for a user if no expansion service address +// is provided. +func UseAutomatedPythonExpansionService(service string) string { + expansionAddress := autoPythonNamespace + Separator + service + return expansionAddress +} + // restricted namespaces to prevent some awkward edge cases. var restricted = map[string]struct{}{ hardOverrideNamespace: {}, // Special handler for overriding. From 2f1aab81ff5597ee039f67df73d2d9526d09fcd9 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 11 Oct 2022 16:27:50 -0400 Subject: [PATCH 2/6] filter test --- sdks/go/test/integration/integration.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index b70a861064bc0..8c000fa9f05ae 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -100,6 +100,8 @@ var directFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } var portableFilters = []string{ @@ -131,6 +133,8 @@ var portableFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } var flinkFilters = []string{ @@ -154,6 +158,8 @@ var flinkFilters = []string{ "TestMapStateClear", "TestSetStateClear", "TestSetState", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } var samzaFilters = []string{ @@ -188,6 +194,8 @@ var samzaFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } var sparkFilters = []string{ @@ -217,6 +225,8 @@ var sparkFilters = []string{ "TestMapStateClear", "TestSetStateClear", "TestSetState", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } var dataflowFilters = []string{ @@ -243,6 +253,8 @@ var dataflowFilters = []string{ "TestBigQueryIO_BasicWriteQueryRead", // Dataflow does not drain jobs by itself. "TestDrain", + // The test uses core.SdkVersion but needs a released version. + "TestStartAutomated", } // CheckFilters checks if an integration test is filtered to be skipped, either From c8ff5fa2f96029ebaae6776520612f7ccf28b5c3 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 11 Oct 2022 16:46:07 -0400 Subject: [PATCH 3/6] filter test for dev versions --- sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go index f91bafa4ce148..c7c90b65a9996 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand_test.go @@ -16,10 +16,16 @@ package xlangx import ( + "strings" "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" ) func TestStartAutomated(t *testing.T) { + if strings.HasSuffix(core.SdkVersion, ".dev") { + t.Skipf("need a released SDK version to test auto python expansion service, got: %s", core.SdkVersion) + } sp, addr, err := startPythonExpansionService("apache_beam.runners.portability.expansion_service_main", "") if err != nil { t.Fatal(err) From 5642ac76e3235814bf745b02ce69479c748d5a5e Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 11 Oct 2022 16:47:52 -0400 Subject: [PATCH 4/6] test --- sdks/go/test/integration/integration.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 8c000fa9f05ae..b70a861064bc0 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -100,8 +100,6 @@ var directFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } var portableFilters = []string{ @@ -133,8 +131,6 @@ var portableFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } var flinkFilters = []string{ @@ -158,8 +154,6 @@ var flinkFilters = []string{ "TestMapStateClear", "TestSetStateClear", "TestSetState", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } var samzaFilters = []string{ @@ -194,8 +188,6 @@ var samzaFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } var sparkFilters = []string{ @@ -225,8 +217,6 @@ var sparkFilters = []string{ "TestMapStateClear", "TestSetStateClear", "TestSetState", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } var dataflowFilters = []string{ @@ -253,8 +243,6 @@ var dataflowFilters = []string{ "TestBigQueryIO_BasicWriteQueryRead", // Dataflow does not drain jobs by itself. "TestDrain", - // The test uses core.SdkVersion but needs a released version. - "TestStartAutomated", } // CheckFilters checks if an integration test is filtered to be skipped, either From 0b66415bf8846cf0a361644f90b99e21bd4814a7 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 12 Oct 2022 10:27:54 -0400 Subject: [PATCH 5/6] update namings --- .../pkg/beam/core/runtime/xlangx/expansionx/download.go | 6 +++--- sdks/go/pkg/beam/core/runtime/xlangx/registry.go | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go index 72714862e42ad..0a7eb482cc407 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go @@ -47,7 +47,8 @@ var defaultJarGetter = newJarGetter() const ( apacheRepository = url("https://repo.maven.apache.org/maven2") beamGroupID = "org/apache/beam" - jarCache = "~/.apache_beam/cache/jars" + cacheDir = "~/.apache_beam/cache" + jarCache = cacheDir + "/jars" ) func newJarGetter() *jarGetter { @@ -405,8 +406,7 @@ func SetUpPythonEnvironment(extraPackage string) (string, error) { // create python virtual environment sort.Strings(extraPackages) - beamPackage := fmt.Sprintf("apache_beam[gcp,aws,asure,dataframe]==%s", core.SdkVersion) - cacheDir := "~/.apache_beam/cache" + beamPackage := fmt.Sprintf("apache_beam[gcp,aws,azure,dataframe]==%s", core.SdkVersion) venvDir := filepath.Join( cacheDir, "venvs", fmt.Sprintf("py-%s-beam-%s-%s", py, core.SdkVersion, strings.Join(extraPackages, ";")), diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go index 92ab8e03c4174..a3d1477a1d54f 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go @@ -232,11 +232,12 @@ func (r *registry) getHandlerFunc(urn, expansionAddr string) (HandlerFunc, strin } // Check this after hardoverrides and URN overrides so those can point to automated expansion // themselves. - if ns == autoJavaNamespace { + switch ns { + case autoJavaNamespace: // Leave expansionAddr unmodified so the autoNamespace keyword sticks. // We strip it manually in the HandlerFunc. return QueryAutomatedExpansionService, expansionAddr - } else if ns == autoPythonNamespace { + case autoPythonNamespace: return QueryPythonExpansionService, expansionAddr } @@ -306,8 +307,8 @@ func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServ // up an expansion service for a user if no expansion service address // is provided. func UseAutomatedPythonExpansionService(service string) string { - expansionAddress := autoPythonNamespace + Separator + service - return expansionAddress + return autoPythonNamespace + Separator + service + } // restricted namespaces to prevent some awkward edge cases. From d014585c4641467af05bc89ffb8e934e45a49652 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 18 Oct 2022 09:53:48 -0400 Subject: [PATCH 6/6] rm line --- sdks/go/pkg/beam/core/runtime/xlangx/registry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go index a3d1477a1d54f..425ed9aa8aa58 100644 --- a/sdks/go/pkg/beam/core/runtime/xlangx/registry.go +++ b/sdks/go/pkg/beam/core/runtime/xlangx/registry.go @@ -308,7 +308,6 @@ func UseAutomatedJavaExpansionService(gradleTarget string, opts ...ExpansionServ // is provided. func UseAutomatedPythonExpansionService(service string) string { return autoPythonNamespace + Separator + service - } // restricted namespaces to prevent some awkward edge cases.