Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
cmachgodaddy committed Oct 30, 2019
1 parent b02bc8a commit 29653f8
Show file tree
Hide file tree
Showing 12 changed files with 1,284 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import com.amazonaws.services.sqs.model.Message;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

class SqsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {

private final List<Message> messagesToDelete;
private final transient Optional<SqsUnboundedReader> reader;

public SqsCheckpointMark(SqsUnboundedReader reader, Collection<Message> messagesToDelete) {
this.reader = Optional.of(reader);
this.messagesToDelete = ImmutableList.copyOf(messagesToDelete);
;
}

@Override
public void finalizeCheckpoint() {
reader.ifPresent(r -> r.delete(messagesToDelete));
}

List<Message> getMessagesToDelete() {
return messagesToDelete;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqsCheckpointMark that = (SqsCheckpointMark) o;
return Objects.equal(messagesToDelete, that.messagesToDelete);
}

@Override
public int hashCode() {
return Objects.hashCode(messagesToDelete);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.io.aws.options.AwsModule;
import org.apache.beam.sdk.io.aws.options.AwsOptions;

import java.io.IOException;
import java.io.Serializable;

class SqsConfiguration implements Serializable {

private String awsRegion;
private String awsCredentialsProviderString;
private String awsClientConfigurationString;

public SqsConfiguration(AwsOptions awsOptions) {
ObjectMapper om = new ObjectMapper();
om.registerModule(new AwsModule());
try {
this.awsCredentialsProviderString =
om.writeValueAsString(awsOptions.getAwsCredentialsProvider());
} catch (JsonProcessingException e) {
this.awsCredentialsProviderString = null;
}

try {
this.awsClientConfigurationString =
om.writeValueAsString(awsOptions.getClientConfiguration());
} catch (JsonProcessingException e) {
this.awsClientConfigurationString = null;
}

this.awsRegion = awsOptions.getAwsRegion();
}

public AWSCredentialsProvider getAwsCredentialsProvider() {
ObjectMapper om = new ObjectMapper();
om.registerModule(new AwsModule());
try {
return om.readValue(awsCredentialsProviderString, AWSCredentialsProvider.class);
} catch (IOException e) {
return null;
}
}

public ClientConfiguration getClientConfiguration() {
ObjectMapper om = new ObjectMapper();
om.registerModule(new AwsModule());
try {
return om.readValue(awsClientConfigurationString, ClientConfiguration.class);
} catch (IOException e) {
return null;
}
}

public String getAwsRegion() {
return awsRegion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;

import javax.annotation.Nullable;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

/**
* An unbounded source for Amazon Simple Queue Service (SQS).
*
* <h3>Reading from an SQS queue</h3>
*
* <p>The {@link SqsIO} {@link Read} returns an unbounded {@link PCollection} of {@link
* com.amazonaws.services.sqs.model.Message} containing the received messages. Note: This source
* does not currently advance the watermark when no new messages are received.
*
* <p>To configure an SQS source, you have to provide the queueUrl to connect to. The following
* example illustrates how to configure the source:
*
* <pre>{@code
* pipeline.apply(SqsIO.read().withQueueUrl(queueUrl))
* }</pre>
*
* <h3>Writing to an SQS queue</h3>
*
* <p>The following example illustrates how to use the sink:
*
* <pre>{@code
* pipeline
* .apply(...) // returns PCollection<SendMessageRequest>
* .apply(SqsIO.write())
* }</pre>
*
* <h3>Additional Configuration</h3>
*
* <p>Additional configuration can be provided via {@link AwsOptions} from command line args or in
* code. For example, if you wanted to provide a secret access key via code:
*
* <pre>{@code
* PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().create();
* AwsOptions awsOptions = pipelineOptions.as(AwsOptions.class);
* BasicAWSCredentials awsCreds = new BasicAWSCredentials("accesskey", "secretkey");
* awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
* Pipeline pipeline = Pipeline.create(options);
* }</pre>
*
* <p>For more information on the available options see {@link AwsOptions}.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class SqsIO {

public static Read read() {
return new AutoValue_SqsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
}

public static Write write() {
return new AutoValue_SqsIO_Write.Builder().build();
}

private SqsIO() {}

/**
* A {@link PTransform} to read/receive messages from SQS. See {@link SqsIO} for more information
* on usage and configuration.
*/
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Message>> {

@Nullable
abstract String queueUrl();

abstract long maxNumRecords();

@Nullable
abstract Duration maxReadTime();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setQueueUrl(String queueUrl);

abstract Builder setMaxNumRecords(long maxNumRecords);

abstract Builder setMaxReadTime(Duration maxReadTime);

abstract Read build();
}

/**
* Define the max number of records received by the {@link Read}. When the max number of records
* is lower than {@code Long.MAX_VALUE}, the {@link Read} will provide a bounded {@link
* PCollection}.
*/
public Read withMaxNumRecords(long maxNumRecords) {
return toBuilder().setMaxNumRecords(maxNumRecords).build();
}

/**
* Define the max read time (duration) while the {@link Read} will receive messages. When this
* max read time is not null, the {@link Read} will provide a bounded {@link PCollection}.
*/
public Read withMaxReadTime(Duration maxReadTime) {
return toBuilder().setMaxReadTime(maxReadTime).build();
}

/** Define the queueUrl used by the {@link Read} to receive messages from SQS. */
public Read withQueueUrl(String queueUrl) {
checkArgument(queueUrl != null, "queueUrl can not be null");
checkArgument(!queueUrl.isEmpty(), "queueUrl can not be empty");
return toBuilder().setQueueUrl(queueUrl).build();
}

@Override
public PCollection<Message> expand(PBegin input) {

org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
org.apache.beam.sdk.io.Read.from(
new SqsUnboundedSource(
this,
new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class))));

PTransform<PBegin, PCollection<Message>> transform = unbounded;

if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
transform = unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
}

return input.getPipeline().apply(transform);
}
}

/**
* A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more information on usage
* and configuration.
*/
@AutoValue
public abstract static class Write extends PTransform<PCollection<SendMessageRequest>, PDone> {
abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Write build();
}

@Override
public PDone expand(PCollection<SendMessageRequest> input) {
input.apply(
ParDo.of(
new SqsWriteFn(
new SqsConfiguration(input.getPipeline().getOptions().as(AwsOptions.class)))));
return PDone.in(input.getPipeline());
}
}

private static class SqsWriteFn extends DoFn<SendMessageRequest, Void> {
private final SqsConfiguration sqsConfiguration;
private transient AmazonSQS sqs;

SqsWriteFn(SqsConfiguration sqsConfiguration) {
this.sqsConfiguration = sqsConfiguration;
}

@Setup
public void setup() {
sqs =
AmazonSQSClientBuilder.standard()
.withClientConfiguration(sqsConfiguration.getClientConfiguration())
.withCredentials(sqsConfiguration.getAwsCredentialsProvider())
.withRegion(sqsConfiguration.getAwsRegion())
.build();
}

@ProcessElement
public void processElement(ProcessContext processContext) throws Exception {
sqs.sendMessage(processContext.element());
}

@Teardown
public void teardown() throws Exception {
if (sqs != null) {
sqs.shutdown();
}
}
}
}
Loading

0 comments on commit 29653f8

Please sign in to comment.