Skip to content

Commit

Permalink
[BEAM-7636] Migrate SqsIO to AWS SDK V2 for Java
Browse files Browse the repository at this point in the history
  • Loading branch information
cmachgodaddy authored and aromanenko-dev committed Nov 22, 2019
1 parent 7a8a26b commit 07d952f
Show file tree
Hide file tree
Showing 17 changed files with 1,166 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_dynamodb : "software.amazon.awssdk:dynamodb:$aws_java_sdk2_version",
aws_java_sdk2_sdk_core : "software.amazon.awssdk:sdk-core:$aws_java_sdk2_version",
aws_java_sdk2_sns : "software.amazon.awssdk:sns:$aws_java_sdk2_version",
aws_java_sdk2_sqs : "software.amazon.awssdk:sqs:$aws_java_sdk2_version",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ dependencies {
compile library.java.aws_java_sdk2_dynamodb
compile library.java.aws_java_sdk2_sdk_core
compile library.java.aws_java_sdk2_sns
compile library.java.aws_java_sdk2_sqs
compile library.java.jackson_core
compile library.java.jackson_annotations
compile library.java.jackson_databind
compile library.java.slf4j_api

testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1'
testCompile 'org.testcontainers:testcontainers:1.11.3'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.net.URI;
import javax.annotation.Nullable;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;

/** Basic implementation of {@link SqsClientProvider} used by default in {@link SqsIO}. */
class BasicSqsClientProvider implements SqsClientProvider {
private final AwsCredentialsProvider awsCredentialsProvider;
private final String region;
@Nullable private final URI serviceEndpoint;

BasicSqsClientProvider(
AwsCredentialsProvider awsCredentialsProvider, String region, @Nullable URI serviceEndpoint) {
checkArgument(awsCredentialsProvider != null, "awsCredentialsProvider can not be null");
checkArgument(region != null, "region can not be null");
this.awsCredentialsProvider = awsCredentialsProvider;
this.region = region;
this.serviceEndpoint = serviceEndpoint;
}

@Override
public SqsClient getSqsClient() {
SqsClientBuilder builder =
SqsClient.builder().credentialsProvider(awsCredentialsProvider).region(Region.of(region));

if (serviceEndpoint != null) {
builder.endpointOverride(serviceEndpoint);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import software.amazon.awssdk.services.sqs.model.Message;

/** Custom Coder for handling SendMessageRequest for using in Write. */
public class MessageCoder extends AtomicCoder<Message> implements Serializable {
private static final MessageCoder INSTANCE = new MessageCoder();

private MessageCoder() {}

static MessageCoder of() {
return INSTANCE;
}

@Override
public void encode(Message value, OutputStream outStream) throws IOException {
StringUtf8Coder.of().encode(value.messageId(), outStream);
StringUtf8Coder.of().encode(value.body(), outStream);
}

@Override
public Message decode(InputStream inStream) throws IOException {
final String messageId = StringUtf8Coder.of().decode(inStream);
final String body = StringUtf8Coder.of().decode(inStream);
return Message.builder().messageId(messageId).body(body).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import com.google.auto.service.AutoService;
import java.util.List;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviderRegistrar;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import software.amazon.awssdk.services.sqs.model.Message;

/** A {@link CoderProviderRegistrar} for standard types used with {@link SqsIO}. */
@AutoService(CoderProviderRegistrar.class)
public class MessageCoderRegistrar implements CoderProviderRegistrar {
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
CoderProviders.forCoder(TypeDescriptor.of(Message.class), MessageCoder.of()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/** Custom Coder for handling SendMessageRequest for using in Write. */
public class SendMessageRequestCoder extends AtomicCoder<SendMessageRequest>
implements Serializable {
private static final SendMessageRequestCoder INSTANCE = new SendMessageRequestCoder();

private SendMessageRequestCoder() {}

static SendMessageRequestCoder of() {
return INSTANCE;
}

@Override
public void encode(SendMessageRequest value, OutputStream outStream) throws IOException {
StringUtf8Coder.of().encode(value.queueUrl(), outStream);
StringUtf8Coder.of().encode(value.messageBody(), outStream);
}

@Override
public SendMessageRequest decode(InputStream inStream) throws IOException {
final String queueUrl = StringUtf8Coder.of().decode(inStream);
final String message = StringUtf8Coder.of().decode(inStream);
return SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import com.google.auto.service.AutoService;
import java.util.List;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviderRegistrar;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/** A {@link CoderProviderRegistrar} for standard types used with {@link SqsIO}. */
@AutoService(CoderProviderRegistrar.class)
public class SendMessageRequestCoderRegistrar implements CoderProviderRegistrar {
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
CoderProviders.forCoder(
TypeDescriptor.of(SendMessageRequest.class), SendMessageRequestCoder.of()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import software.amazon.awssdk.services.sqs.model.Message;

class SqsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {

private final List<Message> messagesToDelete;
private final transient Optional<SqsUnboundedReader> reader;

SqsCheckpointMark(SqsUnboundedReader reader, Collection<Message> messagesToDelete) {
this.reader = Optional.of(reader);
this.messagesToDelete = ImmutableList.copyOf(messagesToDelete);
}

@Override
public void finalizeCheckpoint() {
reader.ifPresent(r -> r.delete(messagesToDelete));
}

List<Message> getMessagesToDelete() {
return messagesToDelete;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqsCheckpointMark that = (SqsCheckpointMark) o;
return Objects.equal(messagesToDelete, that.messagesToDelete);
}

@Override
public int hashCode() {
return Objects.hashCode(messagesToDelete);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.Serializable;
import software.amazon.awssdk.services.sqs.SqsClient;

/**
* Provides instances of Sqs clients.
*
* <p>Please note, that any instance of {@link SqsClientProvider} must be {@link Serializable} to
* ensure it can be sent to worker machines.
*/
public interface SqsClientProvider extends Serializable {
SqsClient getSqsClient();
}
Loading

0 comments on commit 07d952f

Please sign in to comment.