Skip to content

Commit

Permalink
fix: Suneet project for spring cloud kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
codedsun committed Nov 20, 2022
1 parent 8feff9e commit 91e7f19
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 43 deletions.
35 changes: 35 additions & 0 deletions .yo-rc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"generator-jhipster": {
"applicationType": "monolith",
"authenticationType": "jwt",
"baseName": "sampleMongoKafka",
"blueprints": [],
"buildTool": "maven",
"cacheProvider": "ehcache",
"clientFramework": "angular",
"clientPackageManager": "npm",
"clientTheme": "none",
"creationTimestamp": 1596513172471,
"databaseType": "mongodb",
"devDatabaseType": "mongodb",
"dtoSuffix": "DTO",
"embeddableLaunchScript": false,
"enableHibernateCache": false,
"enableTranslation": true,
"entitySuffix": "",
"jhiPrefix": "myPrefix",
"jwtSecretKey": "ZjY4MTM4YjI5YzMwZjhjYjI2OTNkNTRjMWQ5Y2Q0Y2YwOWNmZTE2NzRmYzU3NTMwM2NjOTE3MTllOTM3MWRkMzcyYTljMjVmNmQ0Y2MxOTUzODc0MDhhMTlkMDIxMzI2YzQzZDM2ZDE3MmQ3NjVkODk3OTVmYzljYTQyZDNmMTQ=",
"languages": ["en", "fr"],
"messageBroker": "kafka",
"nativeLanguage": "en",
"otherModules": [],
"packageFolder": "tech/jhipster/sample",
"packageName": "tech.jhipster.sample",
"prodDatabaseType": "mongodb",
"searchEngine": false,
"serverPort": "8080",
"serviceDiscoveryType": false,
"testFrameworks": ["gatling", "cucumber", "cypress"],
"websocket": false
}
}
3 changes: 3 additions & 0 deletions suneet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<type>test-jar</type>
<classifier>test-binder</classifier>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package tech.jhipster.sample.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
// import org.springframework.cloud.stream.annotation.Input;
// import org.springframework.messaging.MessageChannel;

public interface KafkaSseConsumer {
String CHANNELNAME = "binding-in-sse";

@Input(CHANNELNAME)
MessageChannel input();
// @Input(CHANNELNAME)
// MessageChannel input();
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package tech.jhipster.sample.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
// import org.springframework.cloud.stream.annotation.Output;
// import org.springframework.messaging.MessageChannel;

public interface KafkaSseProducer {
String CHANNELNAME = "binding-out-sse";

@Output(CHANNELNAME)
MessageChannel output();
// @Output(CHANNELNAME)
// MessageChannel output();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.kafka.common.protocol.types.Field.Str;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.server.*;
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.env.Profiles;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
Expand All @@ -26,7 +34,6 @@
/**
* Configuration of web application with Servlet 3.0 APIs.
*/
@EnableBinding({ KafkaSseConsumer.class, KafkaSseProducer.class })
@Configuration
public class WebConfigurer implements ServletContextInitializer, WebServerFactoryCustomizer<WebServerFactory> {

Expand Down Expand Up @@ -98,4 +105,14 @@ public CorsFilter corsFilter() {
}
return new CorsFilter(source);
}

@Bean
public Supplier<GenericMessage> codedsun(){
Map<String, Object> map = new HashMap<>();
map.put(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE);
MessageHeaders headers = new MessageHeaders(map);
return () ->{
return new GenericMessage<String>("suneet", headers);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
// import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand All @@ -28,13 +32,14 @@
public class SampleMongoKafkaKafkaResource {

private final Logger log = LoggerFactory.getLogger(SampleMongoKafkaKafkaResource.class);
private final MessageChannel output;
private static final String PRODUCER_BINDING_NAME = "codedsun-out-0";
private final StreamBridge streamBridge;

// TODO implement state of the art emitter repository to become 12 factor
private Map<String, SseEmitter> emitters = new HashMap<>();

public SampleMongoKafkaKafkaResource(@Qualifier(KafkaSseProducer.CHANNELNAME) MessageChannel output) {
this.output = output;
public SampleMongoKafkaKafkaResource(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}

@PostMapping("/publish")
Expand All @@ -43,7 +48,7 @@ public void publish(@RequestParam String message) {
Map<String, Object> map = new HashMap<>();
map.put(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE);
MessageHeaders headers = new MessageHeaders(map);
output.send(new GenericMessage<>(message, headers));
streamBridge.send(PRODUCER_BINDING_NAME, new GenericMessage<>(message, headers));
}

@GetMapping("/register")
Expand All @@ -62,19 +67,22 @@ public void unregister(Principal principal) {
Optional.ofNullable(emitters.get(user)).ifPresent(ResponseBodyEmitter::complete);
}

@StreamListener(value = KafkaSseConsumer.CHANNELNAME, copyHeaders = "false")
public void consume(Message<String> message) {
log.debug("Got message from kafka stream: {}", message.getPayload());
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(message.getPayload(), MediaType.TEXT_PLAIN));
} catch (IOException e) {
log.debug("error sending sse message, {}", message.getPayload());
}
});
@Bean
public Consumer<Message<String>> consume() {
return message -> {
log.debug("Got message from kafka stream: {}", message.getPayload());
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(message.getPayload(), MediaType.TEXT_PLAIN));
} catch (IOException e) {
log.debug("error sending sse message, {}", message.getPayload());
}
});
};
}

}
4 changes: 2 additions & 2 deletions suneet/src/main/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ spring:
auto-create-topics: true
brokers: localhost:9092
bindings:
binding-in-sse:
codedsun-in-0:
destination: sse-topic
content-type: text/plain
group: sample-mongo-kafka
binding-out-sse:
codedsun-out-0:
destination: sse-topic
content-type: text/plain
group: sample-mongo-kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
// import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
Expand All @@ -33,26 +35,32 @@
@EmbeddedKafka
class SampleMongoKafkaKafkaResourceIT {

@Autowired
private MessageCollector collector;
// @Autowired
// private MessageCollector collector;

@Autowired
private MockMvc restMockMvc;

// @Autowired
// @Qualifier(KafkaSseProducer.CHANNELNAME)
// private MessageChannel output;

// @Autowired
// @Qualifier(KafkaSseConsumer.CHANNELNAME)
// private MessageChannel input;

@Autowired
@Qualifier(KafkaSseProducer.CHANNELNAME)
private MessageChannel output;
private InputDestination input;

@Autowired
@Qualifier(KafkaSseConsumer.CHANNELNAME)
private MessageChannel input;
private OutputDestination output;

@Test
void producesMessages() throws Exception {
restMockMvc.perform(post("/api/sample-mongo-kafka-kafka/publish?message=value-produce")).andExpect(status().isOk());
BlockingQueue<Message<?>> messages = collector.forChannel(output);
GenericMessage<String> payload = (GenericMessage<String>) messages.take();
assertThat(payload.getPayload()).isEqualTo("value-produce");
// BlockingQueue<Message<?>> messages = collector.forChannel(output);
// GenericMessage<String> payload = (GenericMessage<String>) messages.take();
assertThat(output.receive().getPayload()).isEqualTo("value-produce");
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions suneet/src/test/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ spring:
replicationFactor: 1
auto-create-topics: true
bindings:
binding-in-sse:
codedsun-in-0:
destination: sse-topic
content-type: text/plain
group: sample-mongo-kafka
binding-out-sse:
codedsun-out-0:
destination: sse-topic
content-type: text/plain
group: sample-mongo-kafka
Expand Down

0 comments on commit 91e7f19

Please sign in to comment.