From 29653f8b921162e9d41f27d9cc5ee38d5eecb49d Mon Sep 17 00:00:00 2001 From: cmach Date: Tue, 29 Oct 2019 21:55:28 -0700 Subject: [PATCH] first draft --- .../sdk/io/aws2/sqs/SqsCheckpointMark.java | 66 ++++++ .../sdk/io/aws2/sqs/SqsConfiguration.java | 79 +++++++ .../apache/beam/sdk/io/aws2/sqs/SqsIO.java | 217 ++++++++++++++++++ .../sdk/io/aws2/sqs/SqsUnboundedReader.java | 167 ++++++++++++++ .../sdk/io/aws2/sqs/SqsUnboundedSource.java | 94 ++++++++ .../beam/sdk/io/aws2/sqs/package-info.java | 19 ++ .../sdk/io/aws2/sqs/SqsCheckpointMark.java | 66 ++++++ .../sdk/io/aws2/sqs/SqsConfiguration.java | 79 +++++++ .../apache/beam/sdk/io/aws2/sqs/SqsIO.java | 217 ++++++++++++++++++ .../sdk/io/aws2/sqs/SqsUnboundedReader.java | 167 ++++++++++++++ .../sdk/io/aws2/sqs/SqsUnboundedSource.java | 94 ++++++++ .../beam/sdk/io/aws2/sqs/package-info.java | 19 ++ 12 files changed, 1284 insertions(+) create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java create mode 100644 sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java new file mode 100644 index 0000000000000..03dc88b289c9c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.model.Message; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +class SqsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { + + private final List messagesToDelete; + private final transient Optional reader; + + public SqsCheckpointMark(SqsUnboundedReader reader, Collection messagesToDelete) { + this.reader = Optional.of(reader); + this.messagesToDelete = ImmutableList.copyOf(messagesToDelete); + ; + } + + @Override + public void finalizeCheckpoint() { + reader.ifPresent(r -> r.delete(messagesToDelete)); + } + + List getMessagesToDelete() { + return messagesToDelete; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqsCheckpointMark that = (SqsCheckpointMark) o; + return Objects.equal(messagesToDelete, that.messagesToDelete); + } + + @Override + public int hashCode() { + return Objects.hashCode(messagesToDelete); + } +} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java new file mode 100644 index 0000000000000..bb971f87ec93d --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.io.aws.options.AwsModule; +import org.apache.beam.sdk.io.aws.options.AwsOptions; + +import java.io.IOException; +import java.io.Serializable; + +class SqsConfiguration implements Serializable { + + private String awsRegion; + private String awsCredentialsProviderString; + private String awsClientConfigurationString; + + public SqsConfiguration(AwsOptions awsOptions) { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + this.awsCredentialsProviderString = + om.writeValueAsString(awsOptions.getAwsCredentialsProvider()); + } catch (JsonProcessingException e) { + this.awsCredentialsProviderString = null; + } + + try { + this.awsClientConfigurationString = + om.writeValueAsString(awsOptions.getClientConfiguration()); + } catch (JsonProcessingException e) { + this.awsClientConfigurationString = null; + } + + this.awsRegion = awsOptions.getAwsRegion(); + } + + public AWSCredentialsProvider getAwsCredentialsProvider() { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + return om.readValue(awsCredentialsProviderString, AWSCredentialsProvider.class); + } catch (IOException e) { + return null; + } + } + + public ClientConfiguration getClientConfiguration() { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + return om.readValue(awsClientConfigurationString, ClientConfiguration.class); + } catch (IOException e) { + return null; + } + } + + public String getAwsRegion() { + return awsRegion; + } +} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java new file mode 100644 index 0000000000000..2f9eb3d1a3e42 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.aws.options.AwsOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; + +import javax.annotation.Nullable; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +/** + * An unbounded source for Amazon Simple Queue Service (SQS). + * + *

Reading from an SQS queue

+ * + *

The {@link SqsIO} {@link Read} returns an unbounded {@link PCollection} of {@link + * com.amazonaws.services.sqs.model.Message} containing the received messages. Note: This source + * does not currently advance the watermark when no new messages are received. + * + *

To configure an SQS source, you have to provide the queueUrl to connect to. The following + * example illustrates how to configure the source: + * + *

{@code
+ * pipeline.apply(SqsIO.read().withQueueUrl(queueUrl))
+ * }
+ * + *

Writing to an SQS queue

+ * + *

The following example illustrates how to use the sink: + * + *

{@code
+ * pipeline
+ *   .apply(...) // returns PCollection
+ *   .apply(SqsIO.write())
+ * }
+ * + *

Additional Configuration

+ * + *

Additional configuration can be provided via {@link AwsOptions} from command line args or in + * code. For example, if you wanted to provide a secret access key via code: + * + *

{@code
+ * PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().create();
+ * AwsOptions awsOptions = pipelineOptions.as(AwsOptions.class);
+ * BasicAWSCredentials awsCreds = new BasicAWSCredentials("accesskey", "secretkey");
+ * awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
+ * Pipeline pipeline = Pipeline.create(options);
+ * }
+ * + *

For more information on the available options see {@link AwsOptions}. + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SqsIO { + + public static Read read() { + return new AutoValue_SqsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); + } + + public static Write write() { + return new AutoValue_SqsIO_Write.Builder().build(); + } + + private SqsIO() {} + + /** + * A {@link PTransform} to read/receive messages from SQS. See {@link SqsIO} for more information + * on usage and configuration. + */ + @AutoValue + public abstract static class Read extends PTransform> { + + @Nullable + abstract String queueUrl(); + + abstract long maxNumRecords(); + + @Nullable + abstract Duration maxReadTime(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setQueueUrl(String queueUrl); + + abstract Builder setMaxNumRecords(long maxNumRecords); + + abstract Builder setMaxReadTime(Duration maxReadTime); + + abstract Read build(); + } + + /** + * Define the max number of records received by the {@link Read}. When the max number of records + * is lower than {@code Long.MAX_VALUE}, the {@link Read} will provide a bounded {@link + * PCollection}. + */ + public Read withMaxNumRecords(long maxNumRecords) { + return toBuilder().setMaxNumRecords(maxNumRecords).build(); + } + + /** + * Define the max read time (duration) while the {@link Read} will receive messages. When this + * max read time is not null, the {@link Read} will provide a bounded {@link PCollection}. + */ + public Read withMaxReadTime(Duration maxReadTime) { + return toBuilder().setMaxReadTime(maxReadTime).build(); + } + + /** Define the queueUrl used by the {@link Read} to receive messages from SQS. */ + public Read withQueueUrl(String queueUrl) { + checkArgument(queueUrl != null, "queueUrl can not be null"); + checkArgument(!queueUrl.isEmpty(), "queueUrl can not be empty"); + return toBuilder().setQueueUrl(queueUrl).build(); + } + + @Override + public PCollection expand(PBegin input) { + + org.apache.beam.sdk.io.Read.Unbounded unbounded = + org.apache.beam.sdk.io.Read.from( + new SqsUnboundedSource( + this, + new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class)))); + + PTransform> transform = unbounded; + + if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) { + transform = unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords()); + } + + return input.getPipeline().apply(transform); + } + } + + /** + * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage + * and configuration. + */ + @AutoValue + public abstract static class Write extends PTransform, PDone> { + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Write build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply( + ParDo.of( + new SqsWriteFn( + new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class))))); + return PDone.in(input.getPipeline()); + } + } + + private static class SqsWriteFn extends DoFn { + private final SqsConfiguration sqsConfiguration; + private transient AmazonSQS sqs; + + SqsWriteFn(SqsConfiguration sqsConfiguration) { + this.sqsConfiguration = sqsConfiguration; + } + + @Setup + public void setup() { + sqs = + AmazonSQSClientBuilder.standard() + .withClientConfiguration(sqsConfiguration.getClientConfiguration()) + .withCredentials(sqsConfiguration.getAwsCredentialsProvider()) + .withRegion(sqsConfiguration.getAwsRegion()) + .build(); + } + + @ProcessElement + public void processElement(ProcessContext processContext) throws Exception { + sqs.sendMessage(processContext.element()); + } + + @Teardown + public void teardown() throws Exception { + if (sqs != null) { + sqs.shutdown(); + } + } + } +} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java new file mode 100644 index 0000000000000..e6dd829b3f00b --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageSystemAttributeName; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; + +class SqsUnboundedReader extends UnboundedSource.UnboundedReader implements Serializable { + + public static final int MAX_NUMBER_OF_MESSAGES = 10; + private final SqsUnboundedSource source; + private Message current; + private final Queue messagesNotYetRead; + private List messagesToDelete; + private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public SqsUnboundedReader(SqsUnboundedSource source, SqsCheckpointMark sqsCheckpointMark) { + this.source = source; + this.current = null; + + this.messagesNotYetRead = new ArrayDeque<>(); + this.messagesToDelete = new ArrayList<>(); + + if (sqsCheckpointMark != null) { + this.messagesToDelete.addAll(sqsCheckpointMark.getMessagesToDelete()); + } + } + + @Override + public Instant getWatermark() { + return oldestPendingTimestamp; + } + + @Override + public Message getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + + return getTimestamp(current); + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current.getMessageId().getBytes(StandardCharsets.UTF_8); + } + + @Override + public CheckpointMark getCheckpointMark() { + return new SqsCheckpointMark(this, messagesToDelete); + } + + @Override + public SqsUnboundedSource getCurrentSource() { + return source; + } + + @Override + public boolean start() { + return advance(); + } + + @Override + public boolean advance() { + if (messagesNotYetRead.isEmpty()) { + pull(); + } + + current = messagesNotYetRead.poll(); + if (current == null) { + return false; + } + + messagesToDelete.add(current); + + Instant currentMessageTimestamp = getCurrentTimestamp(); + if (getCurrentTimestamp().isBefore(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + + return true; + } + + @Override + public void close() {} + + void delete(final Collection messages) { + for (Message message : messages) { + if (messagesToDelete.contains(message)) { + source.getSqs().deleteMessage(source.getRead().queueUrl(), message.getReceiptHandle()); + Instant currentMessageTimestamp = getTimestamp(message); + if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + } + } + } + + private void pull() { + final ReceiveMessageRequest receiveMessageRequest = + new ReceiveMessageRequest(source.getRead().queueUrl()); + + receiveMessageRequest.setMaxNumberOfMessages(MAX_NUMBER_OF_MESSAGES); + receiveMessageRequest.setAttributeNames( + Arrays.asList(MessageSystemAttributeName.SentTimestamp.toString())); + final ReceiveMessageResult receiveMessageResult = + source.getSqs().receiveMessage(receiveMessageRequest); + + final List messages = receiveMessageResult.getMessages(); + + if (messages == null || messages.isEmpty()) { + return; + } + + for (Message message : messages) { + messagesNotYetRead.add(message); + } + } + + private Instant getTimestamp(final Message message) { + return new Instant( + Long.parseLong( + message.getAttributes().get(MessageSystemAttributeName.SentTimestamp.toString()))); + } +} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java new file mode 100644 index 0000000000000..3d4e0865ac42b --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.aws.sqs.SqsIO.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +class SqsUnboundedSource extends UnboundedSource { + + private final Read read; + private final SqsConfiguration sqsConfiguration; + private final Supplier sqs; + + public SqsUnboundedSource(Read read, SqsConfiguration sqsConfiguration) { + this.read = read; + this.sqsConfiguration = sqsConfiguration; + + sqs = + Suppliers.memoize( + (Supplier & Serializable) + () -> + AmazonSQSClientBuilder.standard() + .withClientConfiguration(sqsConfiguration.getClientConfiguration()) + .withCredentials(sqsConfiguration.getAwsCredentialsProvider()) + .withRegion(sqsConfiguration.getAwsRegion()) + .build()); + } + + @Override + public List split(int desiredNumSplits, PipelineOptions options) { + List sources = new ArrayList<>(); + for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) { + sources.add(new SqsUnboundedSource(read, sqsConfiguration)); + } + return sources; + } + + @Override + public UnboundedReader createReader( + PipelineOptions options, @Nullable SqsCheckpointMark checkpointMark) { + return new SqsUnboundedReader(this, checkpointMark); + } + + @Override + public Coder getCheckpointMarkCoder() { + return SerializableCoder.of(SqsCheckpointMark.class); + } + + @Override + public Coder getOutputCoder() { + return SerializableCoder.of(Message.class); + } + + public Read getRead() { + return read; + } + + public AmazonSQS getSqs() { + return sqs.get(); + } + + @Override + public boolean requiresDeduping() { + return true; + } +} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java new file mode 100644 index 0000000000000..601844d70c3b9 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Defines IO connectors for Amazon Web Services SQS. */ +package org.apache.beam.sdk.io.aws2.sqs; diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java new file mode 100644 index 0000000000000..03dc88b289c9c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.model.Message; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +class SqsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { + + private final List messagesToDelete; + private final transient Optional reader; + + public SqsCheckpointMark(SqsUnboundedReader reader, Collection messagesToDelete) { + this.reader = Optional.of(reader); + this.messagesToDelete = ImmutableList.copyOf(messagesToDelete); + ; + } + + @Override + public void finalizeCheckpoint() { + reader.ifPresent(r -> r.delete(messagesToDelete)); + } + + List getMessagesToDelete() { + return messagesToDelete; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqsCheckpointMark that = (SqsCheckpointMark) o; + return Objects.equal(messagesToDelete, that.messagesToDelete); + } + + @Override + public int hashCode() { + return Objects.hashCode(messagesToDelete); + } +} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java new file mode 100644 index 0000000000000..bb971f87ec93d --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.io.aws.options.AwsModule; +import org.apache.beam.sdk.io.aws.options.AwsOptions; + +import java.io.IOException; +import java.io.Serializable; + +class SqsConfiguration implements Serializable { + + private String awsRegion; + private String awsCredentialsProviderString; + private String awsClientConfigurationString; + + public SqsConfiguration(AwsOptions awsOptions) { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + this.awsCredentialsProviderString = + om.writeValueAsString(awsOptions.getAwsCredentialsProvider()); + } catch (JsonProcessingException e) { + this.awsCredentialsProviderString = null; + } + + try { + this.awsClientConfigurationString = + om.writeValueAsString(awsOptions.getClientConfiguration()); + } catch (JsonProcessingException e) { + this.awsClientConfigurationString = null; + } + + this.awsRegion = awsOptions.getAwsRegion(); + } + + public AWSCredentialsProvider getAwsCredentialsProvider() { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + return om.readValue(awsCredentialsProviderString, AWSCredentialsProvider.class); + } catch (IOException e) { + return null; + } + } + + public ClientConfiguration getClientConfiguration() { + ObjectMapper om = new ObjectMapper(); + om.registerModule(new AwsModule()); + try { + return om.readValue(awsClientConfigurationString, ClientConfiguration.class); + } catch (IOException e) { + return null; + } + } + + public String getAwsRegion() { + return awsRegion; + } +} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java new file mode 100644 index 0000000000000..2f9eb3d1a3e42 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.aws.options.AwsOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; + +import javax.annotation.Nullable; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +/** + * An unbounded source for Amazon Simple Queue Service (SQS). + * + *

Reading from an SQS queue

+ * + *

The {@link SqsIO} {@link Read} returns an unbounded {@link PCollection} of {@link + * com.amazonaws.services.sqs.model.Message} containing the received messages. Note: This source + * does not currently advance the watermark when no new messages are received. + * + *

To configure an SQS source, you have to provide the queueUrl to connect to. The following + * example illustrates how to configure the source: + * + *

{@code
+ * pipeline.apply(SqsIO.read().withQueueUrl(queueUrl))
+ * }
+ * + *

Writing to an SQS queue

+ * + *

The following example illustrates how to use the sink: + * + *

{@code
+ * pipeline
+ *   .apply(...) // returns PCollection
+ *   .apply(SqsIO.write())
+ * }
+ * + *

Additional Configuration

+ * + *

Additional configuration can be provided via {@link AwsOptions} from command line args or in + * code. For example, if you wanted to provide a secret access key via code: + * + *

{@code
+ * PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().create();
+ * AwsOptions awsOptions = pipelineOptions.as(AwsOptions.class);
+ * BasicAWSCredentials awsCreds = new BasicAWSCredentials("accesskey", "secretkey");
+ * awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
+ * Pipeline pipeline = Pipeline.create(options);
+ * }
+ * + *

For more information on the available options see {@link AwsOptions}. + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SqsIO { + + public static Read read() { + return new AutoValue_SqsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); + } + + public static Write write() { + return new AutoValue_SqsIO_Write.Builder().build(); + } + + private SqsIO() {} + + /** + * A {@link PTransform} to read/receive messages from SQS. See {@link SqsIO} for more information + * on usage and configuration. + */ + @AutoValue + public abstract static class Read extends PTransform> { + + @Nullable + abstract String queueUrl(); + + abstract long maxNumRecords(); + + @Nullable + abstract Duration maxReadTime(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setQueueUrl(String queueUrl); + + abstract Builder setMaxNumRecords(long maxNumRecords); + + abstract Builder setMaxReadTime(Duration maxReadTime); + + abstract Read build(); + } + + /** + * Define the max number of records received by the {@link Read}. When the max number of records + * is lower than {@code Long.MAX_VALUE}, the {@link Read} will provide a bounded {@link + * PCollection}. + */ + public Read withMaxNumRecords(long maxNumRecords) { + return toBuilder().setMaxNumRecords(maxNumRecords).build(); + } + + /** + * Define the max read time (duration) while the {@link Read} will receive messages. When this + * max read time is not null, the {@link Read} will provide a bounded {@link PCollection}. + */ + public Read withMaxReadTime(Duration maxReadTime) { + return toBuilder().setMaxReadTime(maxReadTime).build(); + } + + /** Define the queueUrl used by the {@link Read} to receive messages from SQS. */ + public Read withQueueUrl(String queueUrl) { + checkArgument(queueUrl != null, "queueUrl can not be null"); + checkArgument(!queueUrl.isEmpty(), "queueUrl can not be empty"); + return toBuilder().setQueueUrl(queueUrl).build(); + } + + @Override + public PCollection expand(PBegin input) { + + org.apache.beam.sdk.io.Read.Unbounded unbounded = + org.apache.beam.sdk.io.Read.from( + new SqsUnboundedSource( + this, + new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class)))); + + PTransform> transform = unbounded; + + if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) { + transform = unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords()); + } + + return input.getPipeline().apply(transform); + } + } + + /** + * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage + * and configuration. + */ + @AutoValue + public abstract static class Write extends PTransform, PDone> { + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Write build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply( + ParDo.of( + new SqsWriteFn( + new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class))))); + return PDone.in(input.getPipeline()); + } + } + + private static class SqsWriteFn extends DoFn { + private final SqsConfiguration sqsConfiguration; + private transient AmazonSQS sqs; + + SqsWriteFn(SqsConfiguration sqsConfiguration) { + this.sqsConfiguration = sqsConfiguration; + } + + @Setup + public void setup() { + sqs = + AmazonSQSClientBuilder.standard() + .withClientConfiguration(sqsConfiguration.getClientConfiguration()) + .withCredentials(sqsConfiguration.getAwsCredentialsProvider()) + .withRegion(sqsConfiguration.getAwsRegion()) + .build(); + } + + @ProcessElement + public void processElement(ProcessContext processContext) throws Exception { + sqs.sendMessage(processContext.element()); + } + + @Teardown + public void teardown() throws Exception { + if (sqs != null) { + sqs.shutdown(); + } + } + } +} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java new file mode 100644 index 0000000000000..e6dd829b3f00b --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageSystemAttributeName; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; + +class SqsUnboundedReader extends UnboundedSource.UnboundedReader implements Serializable { + + public static final int MAX_NUMBER_OF_MESSAGES = 10; + private final SqsUnboundedSource source; + private Message current; + private final Queue messagesNotYetRead; + private List messagesToDelete; + private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public SqsUnboundedReader(SqsUnboundedSource source, SqsCheckpointMark sqsCheckpointMark) { + this.source = source; + this.current = null; + + this.messagesNotYetRead = new ArrayDeque<>(); + this.messagesToDelete = new ArrayList<>(); + + if (sqsCheckpointMark != null) { + this.messagesToDelete.addAll(sqsCheckpointMark.getMessagesToDelete()); + } + } + + @Override + public Instant getWatermark() { + return oldestPendingTimestamp; + } + + @Override + public Message getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + + return getTimestamp(current); + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current.getMessageId().getBytes(StandardCharsets.UTF_8); + } + + @Override + public CheckpointMark getCheckpointMark() { + return new SqsCheckpointMark(this, messagesToDelete); + } + + @Override + public SqsUnboundedSource getCurrentSource() { + return source; + } + + @Override + public boolean start() { + return advance(); + } + + @Override + public boolean advance() { + if (messagesNotYetRead.isEmpty()) { + pull(); + } + + current = messagesNotYetRead.poll(); + if (current == null) { + return false; + } + + messagesToDelete.add(current); + + Instant currentMessageTimestamp = getCurrentTimestamp(); + if (getCurrentTimestamp().isBefore(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + + return true; + } + + @Override + public void close() {} + + void delete(final Collection messages) { + for (Message message : messages) { + if (messagesToDelete.contains(message)) { + source.getSqs().deleteMessage(source.getRead().queueUrl(), message.getReceiptHandle()); + Instant currentMessageTimestamp = getTimestamp(message); + if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + } + } + } + + private void pull() { + final ReceiveMessageRequest receiveMessageRequest = + new ReceiveMessageRequest(source.getRead().queueUrl()); + + receiveMessageRequest.setMaxNumberOfMessages(MAX_NUMBER_OF_MESSAGES); + receiveMessageRequest.setAttributeNames( + Arrays.asList(MessageSystemAttributeName.SentTimestamp.toString())); + final ReceiveMessageResult receiveMessageResult = + source.getSqs().receiveMessage(receiveMessageRequest); + + final List messages = receiveMessageResult.getMessages(); + + if (messages == null || messages.isEmpty()) { + return; + } + + for (Message message : messages) { + messagesNotYetRead.add(message); + } + } + + private Instant getTimestamp(final Message message) { + return new Instant( + Long.parseLong( + message.getAttributes().get(MessageSystemAttributeName.SentTimestamp.toString()))); + } +} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java new file mode 100644 index 0000000000000..3d4e0865ac42b --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedSource.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.sqs; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.aws.sqs.SqsIO.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +class SqsUnboundedSource extends UnboundedSource { + + private final Read read; + private final SqsConfiguration sqsConfiguration; + private final Supplier sqs; + + public SqsUnboundedSource(Read read, SqsConfiguration sqsConfiguration) { + this.read = read; + this.sqsConfiguration = sqsConfiguration; + + sqs = + Suppliers.memoize( + (Supplier & Serializable) + () -> + AmazonSQSClientBuilder.standard() + .withClientConfiguration(sqsConfiguration.getClientConfiguration()) + .withCredentials(sqsConfiguration.getAwsCredentialsProvider()) + .withRegion(sqsConfiguration.getAwsRegion()) + .build()); + } + + @Override + public List split(int desiredNumSplits, PipelineOptions options) { + List sources = new ArrayList<>(); + for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) { + sources.add(new SqsUnboundedSource(read, sqsConfiguration)); + } + return sources; + } + + @Override + public UnboundedReader createReader( + PipelineOptions options, @Nullable SqsCheckpointMark checkpointMark) { + return new SqsUnboundedReader(this, checkpointMark); + } + + @Override + public Coder getCheckpointMarkCoder() { + return SerializableCoder.of(SqsCheckpointMark.class); + } + + @Override + public Coder getOutputCoder() { + return SerializableCoder.of(Message.class); + } + + public Read getRead() { + return read; + } + + public AmazonSQS getSqs() { + return sqs.get(); + } + + @Override + public boolean requiresDeduping() { + return true; + } +} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java new file mode 100644 index 0000000000000..601844d70c3b9 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Defines IO connectors for Amazon Web Services SQS. */ +package org.apache.beam.sdk.io.aws2.sqs;