Skip to content

Commit

Permalink
BigQuery Storage Write API wrapper for Python SDK (apache#25685)
Browse files Browse the repository at this point in the history
* discover one config

* creating dlq for the schematransform

* bq storage write api wrapper

* support creating easy Python-using-Java cross-language gradle tasks for a specified expansion service

* add Python-using-Java jenkins tests for GCP expansion service

* Address comments:
integrate with WriteToBigQuery

use shadowJar

move port acquisition closer to expansion service launch

* typo

* address comments and add tests for new bigquery tools

* lint

* skip bq tools test if GCP dependencies not installed

* sync with HEAD, fix encoding positions mismatch

* fix some tests

* return xvr job back to normal

* missing import
  • Loading branch information
ahmedabu98 authored Mar 28, 2023
1 parent f50c8d5 commit fc7a240
Show file tree
Hide file tree
Showing 24 changed files with 1,326 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS


// This job runs end-to-end cross language GCP IO tests with DataflowRunner.
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.')


// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)


// Publish all test results to Jenkins
publishers {
archiveJunit('**/pytest*.xml')
}


// Gradle goals for this job.
steps {
CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS.each { pythonVersion ->
shell("echo \"Running cross language GCP IO tests with Python ${pythonVersion} on DataflowRunner.\"")
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(":sdks:python:test-suites:dataflow:py${pythonVersion.replace('.', '')}:gcpCrossLanguagePythonUsingJava")
commonJobProperties.setGradleSwitches(delegate)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS


// This job runs end-to-end cross language GCP IO tests with DirectRunner.
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Direct runner.')


// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)


// Publish all test results to Jenkins
publishers {
archiveJunit('**/pytest*.xml')
}


// Gradle goals for this job.
steps {
CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIONS.each { pythonVersion ->
shell("echo \"Running cross language GCP IO tests with Python ${pythonVersion} on DirectRunner.\"")
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(":sdks:python:test-suites:direct:py${pythonVersion.replace('.', '')}:gcpCrossLanguagePythonUsingJava")
commonJobProperties.setGradleSwitches(delegate)
}
}
}
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,48 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

// A class defining the common properties in a given suite of cross-language tests
// Properties are shared across runners and are used when creating a CrossLanguageUsingJavaExpansionConfiguration object
static class CrossLanguageTaskCommon {
// Used as the task name for cross-language
String name
// The expansion service's project path (required)
String expansionProjectPath
// Collect Python pipeline tests with this marker
String collectMarker
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
}

// A class defining the configuration for CrossLanguageUsingJavaExpansion.
static class CrossLanguageUsingJavaExpansionConfiguration {
// Task name for cross-language tests using Java expansion.
String name = 'crossLanguageUsingJavaExpansion'
// Python pipeline options to use.
List<String> pythonPipelineOptions = [
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_cache_millis=10000",
"--experiments=beam_fn_api",
]
// Additional pytest options
List<String> pytestOptions = []
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
// Number of parallel test runs.
Integer numParallelTests = 1
// Whether the pipeline needs --sdk_location option
boolean needsSdkLocation = false
// Project path for the expansion service to start up
String expansionProjectPath
// Collect Python pipeline tests with this marker
String collectMarker
}

// A class defining the configuration for CrossLanguageValidatesRunner.
static class CrossLanguageValidatesRunnerConfiguration {
// Task name for cross-language validate runner case.
Expand Down Expand Up @@ -2375,6 +2417,108 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

/** ***********************************************************************************************/
// Method to create the createCrossLanguageUsingJavaExpansionTask.
// The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
// This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
project.ext.createCrossLanguageUsingJavaExpansionTask = {
// This task won't work if the python build file doesn't exist.
if (!project.project(":sdks:python").buildFile.exists()) {
System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
return
}
def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()

project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(config.expansionProjectPath)
project.evaluationDependsOn(":runners:core-construction-java")
project.evaluationDependsOn(":sdks:java:extensions:python")

// Setting up args to launch the expansion service
def envDir = project.project(":sdks:python").envdir
def pythonDir = project.project(":sdks:python").projectDir
def javaExpansionPort = -1 // will be populated in setupTask
def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath
def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
def expansionServiceOpts = [
"group_id": project.name,
"java_expansion_service_jar": expansionJar,
"java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
]
def javaContainerSuffix
if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
javaContainerSuffix = 'java8'
} else if (JavaVersion.current() == JavaVersion.VERSION_11) {
javaContainerSuffix = 'java11'
} else if (JavaVersion.current() == JavaVersion.VERSION_17) {
javaContainerSuffix = 'java17'
} else {
String exceptionMessage = "Your Java version is unsupported. You need Java version of 8 or 11 or 17 to get started, but your Java version is: " + JavaVersion.current();
throw new GradleException(exceptionMessage)
}

// 1. Builds the chosen expansion service jar and launches it
def setupTask = project.tasks.register(config.name+"Setup") {
dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
dependsOn project.project(config.expansionProjectPath).shadowJar.getPath()
dependsOn ":sdks:python:installGcpTest"
doLast {
project.exec {
// Prepare a port to use for the expansion service
javaExpansionPort = getRandomPort()
expansionServiceOpts.put("java_port", javaExpansionPort)
// setup test env
def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
}
}
}

// 2. Sets up, collects, and runs Python pipeline tests
def sdkLocationOpt = []
if (config.needsSdkLocation) {
setupTask.configure {dependsOn ':sdks:python:sdist'}
sdkLocationOpt = [
"--sdk_location=${pythonDir}/build/apache-beam.tar.gz"
]
}
def beamPythonTestPipelineOptions = [
"pipeline_opts": config.pythonPipelineOptions + sdkLocationOpt,
"test_opts": config.pytestOptions,
"suite": config.name,
"collect": config.collectMarker,
]
def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)
def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
group = "Verification"
description = "Runs Python SDK pipeline tests that use a Java expansion service"
dependsOn setupTask
dependsOn config.startJobServer
doLast {
project.exec {
environment "EXPANSION_JAR", expansionJar
environment "EXPANSION_PORT", javaExpansionPort
executable 'sh'
args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}

// 3. Shuts down the expansion service
def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
// teardown test env
executable 'sh'
args '-c', "$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}"
}

setupTask.configure {finalizedBy cleanupTask}
config.startJobServer.configure {finalizedBy config.cleanupJobServer}

cleanupTask.configure{mustRunAfter pythonTask}
config.cleanupJobServer.configure{mustRunAfter pythonTask}
}

/** ***********************************************************************************************/

// Method to create the crossLanguageValidatesRunnerTask.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,16 @@ public static Schema of(Field... fields) {
/** Returns an identical Schema with sorted fields. */
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, encodingPositions, options}
// {fields, uuid, options}
// Note: encoding positions are not copied over because generally they should align with the
// ordering of field indices. Otherwise, problems may occur when encoding/decoding Rows of
// this schema.
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());
sortedSchema.setEncodingPositions(getEncodingPositions());

return sortedSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public void testSorted() {
.addStringField("d")
.build()
.withOptions(testOptions);
sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions());

assertEquals(true, unorderedSchema.equivalent(unorderedSchemaAfterSorting));
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public class ExpansionServiceSchemaTransformProviderTest {

private static final Schema TEST_SCHEMATRANSFORM_CONFIG_SCHEMA =
Schema.of(
Field.of("str1", FieldType.STRING),
Field.of("str2", FieldType.STRING),
Field.of("int1", FieldType.INT32),
Field.of("int2", FieldType.INT32));
Field.of("int2", FieldType.INT32),
Field.of("str1", FieldType.STRING),
Field.of("str2", FieldType.STRING));

private ExpansionService expansionService = new ExpansionService();

Expand Down Expand Up @@ -381,10 +381,10 @@ public void testSchemaTransformExpansion() {
.values());
Row configRow =
Row.withSchema(TEST_SCHEMATRANSFORM_CONFIG_SCHEMA)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.withFieldValue("int1", 111)
.withFieldValue("int2", 222)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.build();

ByteStringOutputStream outputStream = new ByteStringOutputStream();
Expand Down Expand Up @@ -440,10 +440,10 @@ public void testSchemaTransformExpansionMultiInputMultiOutput() {

Row configRow =
Row.withSchema(TEST_SCHEMATRANSFORM_CONFIG_SCHEMA)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.withFieldValue("int1", 111)
.withFieldValue("int2", 222)
.withFieldValue("str1", "aaa")
.withFieldValue("str2", "bbb")
.build();

ByteStringOutputStream outputStream = new ByteStringOutputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class BeamRowToStorageApiProto {
.put(
SqlTypes.DATETIME.getIdentifier(),
(logicalType, value) ->
CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value))
.put(
SqlTypes.TIMESTAMP.getIdentifier(),
(logicalType, value) -> (ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public static TableRow convertGenericRecordToTableRow(
return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
}

/** Convert a BigQuery TableRow to a Beam Row. */
/** Convert a Beam Row to a BigQuery TableRow. */
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Expand Down Expand Up @@ -686,7 +686,14 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
if (JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
try {
// Handle if datetime value is in micros ie. 123456789
Long value = Long.parseLong(jsonBQString);
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value);
} catch (NumberFormatException e) {
// Handle as a String, ie. "2023-02-16 12:00:00"
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
}
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.TIME.getIdentifier())) {
Expand Down
Loading

0 comments on commit fc7a240

Please sign in to comment.