Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PubsubWriteSchemaTransformProvider #24443

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.DateTime;
Expand All @@ -30,16 +31,25 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

/** An (abstract) helper class for talking to Pubsub via an underlying transport. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class PubsubClient implements Closeable {
private static final Map<String, SerializableFunction<String, Schema>>
schemaTypeToConversionFnMap =
ImmutableMap.of(
com.google.pubsub.v1.Schema.Type.AVRO.name(), new PubsubAvroDefinitionToSchemaFn());

/** Factory for creating clients. */
public interface PubsubClientFactory extends Serializable {
/**
Expand Down Expand Up @@ -170,6 +180,53 @@ public static ProjectPath projectPathFromId(String projectId) {
return new ProjectPath(String.format("projects/%s", projectId));
}

/** Path representing a Pubsub schema. */
public static class SchemaPath implements Serializable {
static final String DELETED_SCHEMA_PATH = "_deleted-schema_";
static final SchemaPath DELETED_SCHEMA = new SchemaPath("", DELETED_SCHEMA_PATH);
private final String projectId;
private final String schemaId;

SchemaPath(String projectId, String schemaId) {
this.projectId = projectId;
this.schemaId = schemaId;
}

SchemaPath(String path) {
List<String> splits = Splitter.on('/').splitToList(path);
checkState(
splits.size() == 4 && "projects".equals(splits.get(0)) && "schemas".equals(splits.get(2)),
"Malformed schema path %s: "
+ "must be of the form \"projects/\" + <project id> + \"schemas\"",
path);
this.projectId = splits.get(1);
this.schemaId = splits.get(3);
}

public String getPath() {
if (schemaId.equals(DELETED_SCHEMA_PATH)) {
return DELETED_SCHEMA_PATH;
}
return String.format("projects/%s/schemas/%s", projectId, schemaId);
}

public String getId() {
return schemaId;
}

public String getProjectId() {
return projectId;
}
}

public static SchemaPath schemaPathFromPath(String path) {
return new SchemaPath(path);
}

public static SchemaPath schemaPathFromId(String projectId, String schemaId) {
return new SchemaPath(projectId, schemaId);
}

/** Path representing a Pubsub subscription. */
public static class SubscriptionPath implements Serializable {
private final String projectId;
Expand Down Expand Up @@ -403,6 +460,9 @@ public abstract void modifyAckDeadline(
/** Create {@code topic}. */
public abstract void createTopic(TopicPath topic) throws IOException;

/** Create {link TopicPath} with {@link SchemaPath}. */
public abstract void createTopic(TopicPath topic, SchemaPath schema) throws IOException;

/*
* Delete {@code topic}.
*/
Expand Down Expand Up @@ -445,4 +505,51 @@ public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, To
* messages have been pulled and the test may complete.
*/
public abstract boolean isEOF();

/** Create {@link com.google.api.services.pubsub.model.Schema} from resource path. */
public abstract void createSchema(
SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type)
throws IOException;

/** Delete {@link SchemaPath}. */
public abstract void deleteSchema(SchemaPath schemaPath) throws IOException;

/** Return {@link SchemaPath} from {@link TopicPath} if exists. */
public abstract SchemaPath getSchemaPath(TopicPath topicPath) throws IOException;

/** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */
public abstract Schema getSchema(SchemaPath schemaPath) throws IOException;

/** Convert a {@link com.google.api.services.pubsub.model.Schema} to a Beam {@link Schema}. */
static Schema fromPubsubSchema(com.google.api.services.pubsub.model.Schema pubsubSchema) {
if (!schemaTypeToConversionFnMap.containsKey(pubsubSchema.getType())) {
throw new IllegalArgumentException(
String.format(
"Pub/Sub schema type %s is not supported at this time", pubsubSchema.getType()));
}
SerializableFunction<String, Schema> definitionToSchemaFn =
schemaTypeToConversionFnMap.get(pubsubSchema.getType());
return definitionToSchemaFn.apply(pubsubSchema.getDefinition());
}

/** Convert a {@link com.google.pubsub.v1.Schema} to a Beam {@link Schema}. */
static Schema fromPubsubSchema(com.google.pubsub.v1.Schema pubsubSchema) {
String typeName = pubsubSchema.getType().name();
if (!schemaTypeToConversionFnMap.containsKey(typeName)) {
throw new IllegalArgumentException(
String.format("Pub/Sub schema type %s is not supported at this time", typeName));
}
SerializableFunction<String, Schema> definitionToSchemaFn =
schemaTypeToConversionFnMap.get(typeName);
return definitionToSchemaFn.apply(pubsubSchema.getDefinition());
}

static class PubsubAvroDefinitionToSchemaFn implements SerializableFunction<String, Schema> {
@Override
public Schema apply(String definition) {
checkNotNull(definition, "Pub/Sub schema definition is null");
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(definition);
return AvroUtils.toBeamSchema(avroSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import com.google.auth.Credentials;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.CreateSchemaRequest;
import com.google.pubsub.v1.DeleteSchemaRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.GetSchemaRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.ListSubscriptionsRequest;
import com.google.pubsub.v1.ListSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
Expand All @@ -39,6 +44,9 @@
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SchemaServiceGrpc;
import com.google.pubsub.v1.SchemaServiceGrpc.SchemaServiceBlockingStub;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
import com.google.pubsub.v1.Subscription;
Expand All @@ -51,11 +59,17 @@
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -137,6 +151,8 @@ public String getKind() {

private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;

private SchemaServiceGrpc.SchemaServiceBlockingStub cachedSchemaServiceStub;

@VisibleForTesting
PubsubGrpcClient(
@Nullable String timestampAttribute,
Expand All @@ -161,6 +177,7 @@ public void close() {
// Can gc the underlying stubs.
cachedPublisherStub = null;
cachedSubscriberStub = null;
cachedSchemaServiceStub = null;
// Mark the client as having been closed before going further
// in case we have an exception from the channel.
ManagedChannel publisherChannel = this.publisherChannel;
Expand Down Expand Up @@ -205,6 +222,14 @@ private SubscriberBlockingStub subscriberStub() throws IOException {
return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
}

/** Return a stub for making a schema service request with a timeout. */
private SchemaServiceBlockingStub schemaServiceStub() throws IOException {
if (cachedSchemaServiceStub == null) {
cachedSchemaServiceStub = SchemaServiceGrpc.newBlockingStub(newChannel());
}
return cachedSchemaServiceStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
}

@Override
public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath());
Expand Down Expand Up @@ -310,6 +335,20 @@ public void createTopic(TopicPath topic) throws IOException {
publisherStub().createTopic(request); // ignore Topic result.
}

@Override
public void createTopic(TopicPath topic, SchemaPath schema) throws IOException {
Topic request =
Topic.newBuilder()
.setName(topic.getPath())
.setSchemaSettings(
SchemaSettings.newBuilder()
.setSchema(schema.getPath())
.setEncoding(Encoding.BINARY)
.build())
.build();
publisherStub().createTopic(request); // ignore Topic result.
}

@Override
public void deleteTopic(TopicPath topic) throws IOException {
DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic.getPath()).build();
Expand Down Expand Up @@ -396,4 +435,62 @@ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException
public boolean isEOF() {
return false;
}

/** Create {@link com.google.pubsub.v1.Schema} from resource path. */
@Override
public void createSchema(
SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type)
throws IOException {

Path path =
FileSystems.getDefault()
.getPath(
Objects.requireNonNull(PubsubGrpcClient.class.getResource(resourcePath)).getPath());
byte[] b = Files.readAllBytes(path);
String definition = new String(b, StandardCharsets.UTF_8);

CreateSchemaRequest request =
CreateSchemaRequest.newBuilder()
.setSchemaId(schemaPath.getId())
.setParent("projects/" + schemaPath.getProjectId())
.setSchema(
com.google.pubsub.v1.Schema.newBuilder()
.setType(type)
.setDefinition(definition)
.build())
.build();

schemaServiceStub().createSchema(request); // Result is ignored
}

/** Delete {@link SchemaPath}. */
@Override
public void deleteSchema(SchemaPath schemaPath) throws IOException {
DeleteSchemaRequest request =
DeleteSchemaRequest.newBuilder().setName(schemaPath.getPath()).build();
schemaServiceStub().deleteSchema(request);
}

/** Return {@link SchemaPath} from {@link TopicPath} if exists. */
@Override
public SchemaPath getSchemaPath(TopicPath topicPath) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topicPath.getPath()).build();
Topic topic = publisherStub().getTopic(request);
SchemaSettings schemaSettings = topic.getSchemaSettings();
if (schemaSettings.getSchema().isEmpty()) {
return null;
}
String schemaPath = schemaSettings.getSchema();
if (schemaPath.equals(SchemaPath.DELETED_SCHEMA_PATH)) {
return null;
}
return PubsubClient.schemaPathFromPath(schemaPath);
}

/** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */
@Override
public Schema getSchema(SchemaPath schemaPath) throws IOException {
GetSchemaRequest request = GetSchemaRequest.newBuilder().setName(schemaPath.getPath()).build();
return fromPubsubSchema(schemaServiceStub().getSchema(request));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.TreeMap;
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -277,6 +278,11 @@ public void createTopic(TopicPath topic) throws IOException {
.execute(); // ignore Topic result.
}

@Override
public void createTopic(TopicPath topic, SchemaPath schema) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void deleteTopic(TopicPath topic) throws IOException {
pubsub.projects().topics().delete(topic.getPath()).execute(); // ignore Empty result.
Expand Down Expand Up @@ -358,4 +364,40 @@ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException
public boolean isEOF() {
return false;
}

/** Create {@link com.google.api.services.pubsub.model.Schema} from resource path. */
@Override
public void createSchema(
SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type)
throws IOException {
throw new UnsupportedOperationException();
}

/** Delete {@link SchemaPath}. */
@Override
public void deleteSchema(SchemaPath schemaPath) throws IOException {
throw new UnsupportedOperationException();
}

/** Return {@link SchemaPath} from {@link TopicPath} if exists. */
@Override
public SchemaPath getSchemaPath(TopicPath topicPath) throws IOException {
Topic topic = pubsub.projects().topics().get(topicPath.getPath()).execute();
if (topic.getSchemaSettings() == null) {
return null;
}
String schemaPath = topic.getSchemaSettings().getSchema();
if (schemaPath.equals(SchemaPath.DELETED_SCHEMA_PATH)) {
return null;
}
return PubsubClient.schemaPathFromPath(schemaPath);
}

/** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */
@Override
public Schema getSchema(SchemaPath schemaPath) throws IOException {
com.google.api.services.pubsub.model.Schema pubsubSchema =
pubsub.projects().schemas().get(schemaPath.getPath()).execute();
return fromPubsubSchema(pubsubSchema);
}
}
Loading