Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Avro] Use "extensions/avro" instead of avro from"core" in Java SDK modules #24878

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.kafka_clients
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:python")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.examples.complete.kafkatopubsub.avro;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/**
* Example of AVRO serialization class. To configure your AVRO schema, change this class to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer;
import org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.SslConsumerFactoryFn;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.ml.AnnotateText;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.examples.subprocess.utils;

import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** Contains the configuration for the external library. */
@DefaultCoder(AvroCoder.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.beam.examples.complete.game.StatefulTeamScore.UpdateTeamScoreFn;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand Down
1 change: 1 addition & 0 deletions examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
// Add the dependency that sdks:java:core that is marked as provided
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:google-cloud-platform")
implementation library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.api.services.bigquery.model.*
import com.google.common.collect.ImmutableList
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.coders.DefaultCoder
import org.apache.beam.sdk.coders.DoubleCoder
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
Expand Down
1 change: 1 addition & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:fn-execution")
implementation project(path: ":sdks:java:extensions:avro")
implementation library.java.vendored_grpc_1_48_1
implementation library.java.vendored_guava_26_0_jre
implementation library.java.classgraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Coder registrar for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;

/** Coder translator for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -48,6 +47,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down
3 changes: 2 additions & 1 deletion runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def dependOnProjects = [":runners:core-construction-java",
":runners:core-java",
":runners:local-java",
":runners:java-fn-execution",
":sdks:java:fn-execution"
":sdks:java:fn-execution",
":sdks:java:extensions:avro"
]

applyJavaNature(
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@SuppressWarnings({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand All @@ -54,6 +53,7 @@
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dependencies {
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation project(":runners:google-cloud-dataflow-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(path: ":model:fn-execution", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def sdk_provided_shaded_project_dependencies = [
]
def sdk_provided_project_dependencies = [
":runners:google-cloud-dataflow-java",
":sdks:java:extensions:avro",
":sdks:java:extensions:google-cloud-platform-core",
":sdks:java:io:google-cloud-platform",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.extensions.avro.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.OffsetBasedSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
Expand Down Expand Up @@ -63,6 +61,8 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
Expand Down
1 change: 1 addition & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ dependencies {
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(":runners:java-job-service")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,22 @@ private Providers() {}
public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> klass) {
Map<String, T> providers = new HashMap<>();
for (T provider : ServiceLoader.load(klass)) {
checkArgument(
!providers.containsKey(provider.identifier()),
"Duplicate providers exist with identifier `%s` for class %s.",
provider.identifier(),
klass);
providers.put(provider.identifier(), provider);
if (provider.identifier().equals("avro")) { // "avro" is a special case
if (provider
.toString()
.startsWith(
"org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
// Use AvroPayloadSerializerProvider only from extensions/avro
providers.put(provider.identifier(), provider);
}
} else {
checkArgument(
!providers.containsKey(provider.identifier()),
"Duplicate providers exist with identifier `%s` for class %s.",
provider.identifier(),
klass);
providers.put(provider.identifier(), provider);
}
}
return providers;
}
Expand Down
1 change: 1 addition & 0 deletions sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ applyJavaNature(
disableLintWarnings: ['rawtypes'], // Avro-generated test code has raw-type errors
publish: false,
exportJavadoc: false,
shadowClosure: {}
)
applyAvroNature()

Expand Down
1 change: 1 addition & 0 deletions sdks/java/extensions/sketching/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation "com.tdunning:t-digest:$tdigest_version"
testImplementation library.java.avro
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.sketching.ApproximateDistinct.ApproximateDistinctFn;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.PAssert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.sketching.SketchFrequencies.CountMinSketchFn;
import org.apache.beam.sdk.extensions.sketching.SketchFrequencies.Sketch;
import org.apache.beam.sdk.testing.CoderProperties;
Expand Down
1 change: 1 addition & 0 deletions sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dependencies {
fmppTask "org.freemarker:freemarker:2.3.31"
fmppTemplates library.java.vendored_calcite_1_28_0
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:join-library")
permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761
implementation project(":sdks:java:extensions:sql:udf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.avro;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.AvroSchemaIOProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.AvroSchemaIOProvider;
import org.apache.beam.sdk.schemas.io.SchemaIOProvider;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
Expand All @@ -33,7 +34,6 @@
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
Expand Down
Loading