Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS entry point #242

Merged
merged 2 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ Gradle plugin to create a java and kotlin application based on Clean Architectur
- [Plugin Implementation](#plugin-implementation)
- [Tasks](#tasks)
- [Generate Project](#generate-project)
- [Generate Model](#generate-model)
- [Generate Use Case](#generate-use-case)
- [Generate Model for Java an Kotlin](#generate-model-for-java-an-kotlin)
- [Generate Use Case for Java and Kotlin](#generate-use-case-for-java-and-kotlin)
- [Generate Driven Adapter](#generate-driven-adapter)
- [Generate Entry Point](#generate-entry-point)
- [Generate Helper](#generate-helper)
- [Generate Pipeline](#generate-pipeline)
- [Generate Acceptance Tests](#generate-acceptance-test)
- [Generate Acceptance Test](#generate-acceptance-test)
- [Validate Structure](#validate-structure)
- [Delete Module](#delete-module)
- [Update Project](#update-project)
Expand Down Expand Up @@ -353,15 +353,16 @@ The **`generateEntryPoint | gep`** task will generate a module in Infrastructure
gradle gep --type [entryPointType]
```

| Reference for **entryPointType** | Name | Additional Options |Java | Kotlin |
|----------------------------------|----------------------------------------|------------------------------------------|------|--------|
| generic | Empty Entry Point | --name [name] |☑|☑|
| restmvc | API REST (Spring Boot Starter Web) | --server [serverOption] default undertow |☑|☑|
| webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true |☑|☑|
| rsocket | Rsocket Controller Entry Point | |☑|☑|
| graphql | API GraphQL | --pathgql [name path] default /graphql |☑|☑|
| asynceventhandler | Async Event Handler | |☑|☑|
| mq | JMS MQ Client to listen messages | |☑|☑|
| Reference for **entryPointType** | Name | Additional Options |Java | Kotlin |
|----------------------------------|----------------------------------------|------------------------------------------|-------|---------|
| generic | Empty Entry Point | --name [name] |☑| ☑ |
| restmvc | API REST (Spring Boot Starter Web) | --server [serverOption] default undertow |☑| ☑ |
| webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true |☑| ☑ |
| rsocket | Rsocket Controller Entry Point | |☑| ☑ |
| graphql | API GraphQL | --pathgql [name path] default /graphql |☑| ☑ |
| asynceventhandler | Async Event Handler | |☑| ☑ |
| mq | JMS MQ Client to listen messages | |☑| ☑ |
| sqs | SQS Listener | |☑| |

Additionally, if you'll use a restmvc, you can specify the web server on which the application will run. By default, undertow.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,26 @@ public void shouldGenerateMQDrivenAdapterNoReactive() {
assertEquals(result.task(":" + task).getOutcome(), TaskOutcome.SUCCESS);
}

@Test
public void shouldGenerateSQSEntryPoint() {
canRunTaskGenerateStructureReactiveProject();
String task = GENERATE_ENTRY_POINT;
String type = "SQS";

runner.withArguments(task, TYPE + type);
runner.withProjectDir(projectDir);
BuildResult result = runner.build();

assertTrue(
new File("build/functionalTest/infrastructure/entry-points/sqs-listener/build.gradle")
.exists());
assertTrue(
new File(
"build/functionalTest/infrastructure/entry-points/sqs-listener/src/main/java/co/com/bancolombia/sqs/listener/SQSProcessor.java")
.exists());
assertEquals(result.task(":" + task).getOutcome(), TaskOutcome.SUCCESS);
}

@Test
public void shouldUpdateProject() {
canRunTaskGenerateStructureWithOutParameters();
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/co/com/bancolombia/factory/ModuleBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public Set<String> findExpressions(String path, String regex) {
public void appendDependencyToModule(String module, String dependency) throws IOException {
logger.lifecycle("adding dependency {} to module {}", dependency, module);
String buildFilePath = project.getChildProjects().get(module).getBuildFile().getPath();
buildFilePath = buildFilePath.replace(project.getProjectDir().getPath(), ".");
if (isKotlin() && !buildFilePath.endsWith(KTS)) {
buildFilePath += KTS;
}
Expand Down Expand Up @@ -215,7 +216,8 @@ public void addParamPackage(String packageName) {
}

public void addFile(String path, String content) {
this.files.put(path, FileModel.builder().path(path).content(content).build());
String finalPath = FileUtils.toRelative(path);
this.files.put(finalPath, FileModel.builder().path(finalPath).content(content).build());
}

public void addDir(String path) {
Expand Down Expand Up @@ -358,10 +360,11 @@ private Boolean getABooleanProperty(String property) {
}

private String readFile(String path) throws IOException {
FileModel current = files.get(path);
String finalPath = FileUtils.toRelative(path);
FileModel current = files.get(finalPath);
String content;
if (current == null) {
content = FileUtils.readFile(getProject(), path).collect(Collectors.joining("\n"));
content = FileUtils.readFile(getProject(), finalPath).collect(Collectors.joining("\n"));
} else {
content = current.getContent();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package co.com.bancolombia.factory.entrypoints;

import static co.com.bancolombia.Constants.APP_SERVICE;
import static co.com.bancolombia.utils.Utils.buildImplementationFromProject;

import co.com.bancolombia.exceptions.CleanException;
import co.com.bancolombia.factory.ModuleBuilder;
import co.com.bancolombia.factory.ModuleFactory;
import co.com.bancolombia.factory.validations.ReactiveTypeValidation;
import java.io.IOException;

public class EntryPointSQS implements ModuleFactory {

@Override
public void buildModule(ModuleBuilder builder) throws IOException, CleanException {
builder.runValidations(ReactiveTypeValidation.class);

builder.setupFromTemplate("entry-point/sqs");
builder.appendToSettings("sqs-listener", "infrastructure/entry-points");
String dependency = buildImplementationFromProject(builder.isKotlin(), ":sqs-listener");
builder.appendDependencyToModule(APP_SERVICE, dependency);

builder.addAwsBom();
builder
.appendToProperties("entrypoint.sqs")
.put("region", "us-east-1")
.put("queueUrl", "http://localhost:4566/000000000000/sample")
.put("waitTimeSeconds", 20)
.put("maxNumberOfMessages", 10)
.put("visibilityTimeout", 10000)
.put("numberOfThreads", 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public static ModuleFactory getEntryPointFactory(EntryPointType type)
return new EntryPointAsyncEventHandler();
case MQ:
return new EntryPointMQ();
case SQS:
return new EntryPointSQS();
default:
throw new InvalidTaskOptionException("Entry Point type invalid");
}
Expand All @@ -34,6 +36,7 @@ public enum EntryPointType {
RSOCKET,
GRAPHQL,
ASYNCEVENTHANDLER,
MQ
MQ,
SQS
}
}
10 changes: 10 additions & 0 deletions src/main/java/co/com/bancolombia/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,14 @@ public static String parseToYaml(ObjectNode node) throws IOException {
public static boolean exists(String dir, String file) {
return Files.exists(Paths.get(dir, file));
}

public static String toRelative(String path) {
if (path.startsWith("./")) {
return path;
}
if (path.startsWith("/")) {
return path;
}
return "./" + path;
}
}
7 changes: 7 additions & 0 deletions src/main/resources/entry-point/sqs/build.gradle.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dependencies {
implementation project(':model')
implementation project(':usecase')
implementation 'org.springframework.boot:spring-boot'
implementation 'software.amazon.awssdk:sqs'
implementation 'org.apache.logging.log4j:log4j-api'
}
16 changes: 16 additions & 0 deletions src/main/resources/entry-point/sqs/definition.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"folders": [
"infrastructure/entry-points/sqs-listener/src/test/{{language}}/{{packagePath}}/sqs/listener/helper",
"infrastructure/entry-points/sqs-listener/src/test/{{language}}/{{packagePath}}/sqs/listener/config"
],
"files": {},
"java": {
"entry-point/sqs/build.gradle.mustache": "infrastructure/entry-points/sqs-listener/build.gradle",
"entry-point/sqs/sqs-config.java.mustache": "infrastructure/entry-points/sqs-listener/src/main/{{language}}/{{packagePath}}/sqs/listener/config/SQSConfig.java",
"entry-point/sqs/sqs-listener.java.mustache": "infrastructure/entry-points/sqs-listener/src/main/{{language}}/{{packagePath}}/sqs/listener/helper/SQSListener.java",
"entry-point/sqs/sqs-processor.java.mustache": "infrastructure/entry-points/sqs-listener/src/main/{{language}}/{{packagePath}}/sqs/listener/SQSProcessor.java",
"entry-point/sqs/sqs-properties.java.mustache": "infrastructure/entry-points/sqs-listener/src/main/{{language}}/{{packagePath}}/sqs/listener/config/SQSProperties.java"
},
"kotlin": {
}
}
53 changes: 53 additions & 0 deletions src/main/resources/entry-point/sqs/sqs-config.java.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package {{package}}.sqs.listener.config;

import {{package}}.sqs.listener.helper.SQSListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.net.URI;
import java.util.function.Function;

@Configuration
public class SQSConfig {

@Bean
public SQSListener sqsListener(SqsAsyncClient client, SQSProperties properties, Function<Message, Mono<Void>> fn) {
return SQSListener.builder()
.client(client)
.properties(properties)
.processor(fn)
.build()
.start();
}

@Bean
public SqsAsyncClient configSqs(SQSProperties properties) {
return SqsAsyncClient.builder()
//.endpointOverride(URI.create("http://localhost:4566")) // TODO: enable only for localstack
.region(Region.of(properties.getRegion()))
.credentialsProvider(getProviderChain())
.build();
}

private AwsCredentialsProviderChain getProviderChain() {
return AwsCredentialsProviderChain.builder()
.addCredentialsProvider(EnvironmentVariableCredentialsProvider.create())
.addCredentialsProvider(SystemPropertyCredentialsProvider.create())
.addCredentialsProvider(WebIdentityTokenFileCredentialsProvider.create())
.addCredentialsProvider(ProfileCredentialsProvider.create())
.addCredentialsProvider(ContainerCredentialsProvider.builder().build())
.addCredentialsProvider(InstanceProfileCredentialsProvider.create())
.build();
}
}
124 changes: 124 additions & 0 deletions src/main/resources/entry-point/sqs/sqs-listener.java.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package {{package}}.sqs.listener.helper;

import {{package}}.sqs.listener.config.SQSProperties;
{{#lombok}}
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
{{/lombok}}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

{{#lombok}}
@Log4j2
@Builder
{{/lombok}}
public class SQSListener {
{{^lombok}}
private static final org.apache.logging.log4j.Logger log = org.apache.logging.log4j.LogManager.getLogger(SQSListener.class);
{{/lombok}}
private final SqsAsyncClient client;
private final SQSProperties properties;
private final Function<Message, Mono<Void>> processor;
{{^lombok}}

public SQSListener(final SqsAsyncClient client, final SQSProperties properties, final Function<Message, Mono<Void>> processor) {
this.client = client;
this.properties = properties;
this.processor = processor;
}
{{/lombok}}

public SQSListener start() {
ExecutorService service = Executors.newFixedThreadPool(properties.getNumberOfThreads());
Flux<Void> flow = listenRetryRepeat().publishOn(Schedulers.fromExecutorService(service));
for (int i = 0; i < properties.getNumberOfThreads(); i++) {
flow.subscribe();
}
return this;
}

private Flux<Void> listenRetryRepeat() {
return listen()
.doOnError(e -> log.error("Error listening sqs queue", e))
.repeat();
}

private Flux<Void> listen() {
return getMessages()
.flatMap(message -> processor.apply(message).then(confirm(message)))
.onErrorContinue((e, o) -> log.error("Error listening sqs message", e));
}

private Mono<Void> confirm(Message message) {
return Mono.fromCallable(() -> getDeleteMessageRequest(message.receiptHandle()))
.flatMap(request -> Mono.fromFuture(client.deleteMessage(request)))
.then();
}

private Flux<Message> getMessages() {
return Mono.fromCallable(this::getReceiveMessageRequest)
.flatMap(request -> Mono.fromFuture(client.receiveMessage(request)))
.doOnNext(response -> log.debug("{} received messages from sqs", response.messages().size()))
.flatMapMany(response -> Flux.fromIterable(response.messages()));
}

private ReceiveMessageRequest getReceiveMessageRequest() {
return ReceiveMessageRequest.builder()
.queueUrl(properties.getQueueUrl())
.maxNumberOfMessages(properties.getMaxNumberOfMessages())
.waitTimeSeconds(properties.getWaitTimeSeconds())
.visibilityTimeout(properties.getVisibilityTimeout())
.build();
}

private DeleteMessageRequest getDeleteMessageRequest(String receiptHandle) {
return DeleteMessageRequest.builder()
.queueUrl(properties.getQueueUrl())
.receiptHandle(receiptHandle)
.build();
}
{{^lombok}}

public static class SQSListenerBuilder {
private SqsAsyncClient client;
private SQSProperties properties;
private Function<Message, Mono<Void>> processor;

SQSListenerBuilder() {
}

public SQSListener.SQSListenerBuilder client(final SqsAsyncClient client) {
this.client = client;
return this;
}

public SQSListener.SQSListenerBuilder properties(final SQSProperties properties) {
this.properties = properties;
return this;
}

public SQSListener.SQSListenerBuilder processor(final Function<Message, Mono<Void>> processor) {
this.processor = processor;
return this;
}

public SQSListener build() {
return new SQSListener(this.client, this.properties, this.processor);
}

}

public static SQSListener.SQSListenerBuilder builder() {
return new SQSListener.SQSListenerBuilder();
}
{{/lombok}}
}
Loading