Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka listener in notification service #3

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6d8ce60
Add notification domain
NenadJeckovic Apr 23, 2024
deb748a
Add notification services and ports
NenadJeckovic Apr 23, 2024
4a99f29
Add security configuration
NenadJeckovic Apr 23, 2024
c913230
Add notification persistence
NenadJeckovic Apr 23, 2024
d704b14
Add notification endpoints
NenadJeckovic Apr 23, 2024
4610376
Add kafka notification message consumer
NenadJeckovic Apr 23, 2024
dd1d88a
Add tests setup
NenadJeckovic Apr 23, 2024
7311357
Add security test
NenadJeckovic Apr 23, 2024
8ef02af
Add service tests
NenadJeckovic Apr 23, 2024
169ec13
Add persistence tests
NenadJeckovic Apr 23, 2024
d9f306d
Add integration tests
NenadJeckovic Apr 23, 2024
ab580f6
Add created date to notification
NenadJeckovic Apr 30, 2024
f72a77b
Fix notification tests
NenadJeckovic Apr 30, 2024
999f3dd
Fix code smells
NenadJeckovic May 8, 2024
949fc4a
Add kafka message receiving test
NenadJeckovic May 8, 2024
d425067
Fix kafka message receiving test
NenadJeckovic May 8, 2024
e204f02
Fix kafka message receiving test
NenadJeckovic May 8, 2024
6a33a10
Fix kafka message receiving test
NenadJeckovic May 8, 2024
bfe2105
Debug test
NenadJeckovic May 8, 2024
b4c722d
Debug test
NenadJeckovic May 8, 2024
6e8fa74
Debug test
NenadJeckovic May 8, 2024
3e7d509
Debug test
NenadJeckovic May 8, 2024
d3599ee
Debug test
NenadJeckovic May 8, 2024
a9f8a02
Fix test
NenadJeckovic May 8, 2024
55458c2
Fix test
NenadJeckovic May 8, 2024
343e9da
Fix test
NenadJeckovic May 8, 2024
c27ba0d
Add lombok config
NenadJeckovic May 9, 2024
b0d2fee
Debug test
NenadJeckovic May 9, 2024
121d7a0
Debug test
NenadJeckovic May 10, 2024
4a9a674
Fix test
NenadJeckovic May 10, 2024
f01027b
Fix test
NenadJeckovic May 10, 2024
c3a3444
Fix test again
NenadJeckovic May 10, 2024
3e8bd91
Debug test
NenadJeckovic May 10, 2024
65333bb
Debug test
NenadJeckovic May 10, 2024
55bfc5b
Debug test
NenadJeckovic May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lombok.addLombokGeneratedAnnotation = true
56 changes: 56 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<description>Notification service for rbc library</description>
<properties>
<java.version>17</java.version>
<mapstruct.version>1.4.2.Final</mapstruct.version>
<sonar.organization>productdock</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.projectKey>ProductDock_rbc-library-notification</sonar.projectKey>
Expand All @@ -24,6 +25,19 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-resource-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
Expand All @@ -39,6 +53,48 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.productdock.library.notification;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@SpringBootApplication
@EnableAutoConfiguration
@EnableMongoRepositories
public class NotificationApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.productdock.library.notification.adapter.in.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.productdock.library.notification.adapter.in.kafka.messages.NotificationMapper;
import com.productdock.library.notification.adapter.in.kafka.messages.NotificationMessage;
import com.productdock.library.notification.application.port.in.AddNotificationUseCase;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.time.OffsetDateTime;

@Component
@Slf4j
@AllArgsConstructor
public class KafkaConsumer {

private NotificationMapper notificationMapper;
private ObjectMapper objectMapper;
private AddNotificationUseCase addNotificationUseCase;

@KafkaListener(topics = "${spring.kafka.topic.notifications}")
public synchronized void listenNotifications(ConsumerRecord<String, String> message) throws JsonProcessingException {
log.info("Received notification kafka message: {}", message);

var notificationMessage = objectMapper.readValue(message.value(), NotificationMessage.class);
var notification = notificationMapper.toDomain(notificationMessage);
notification.setCreatedDate(OffsetDateTime.now());
addNotificationUseCase.saveNotification(notification);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.productdock.library.notification.adapter.in.kafka.messages;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

@AllArgsConstructor
@Getter
@Builder
public class ActionMessage {
private String type;
private String target;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.productdock.library.notification.adapter.in.kafka.messages;

import com.productdock.library.notification.domain.Notification;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.ReportingPolicy;

@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE, componentModel = "spring")
public interface NotificationMapper {

@Mapping(target = "read", ignore = true)
@Mapping(target = "id", ignore = true)
@Mapping(target = "createdDate", ignore = true)
public Notification toDomain(NotificationMessage source);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.productdock.library.notification.adapter.in.kafka.messages;

import lombok.AllArgsConstructor;
import lombok.Builder;

@Builder
@AllArgsConstructor
public class NotificationMessage {
public final String title;
public final String description;
public final String userId;
public final ActionMessage action;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.productdock.library.notification.adapter.in.web;

import com.productdock.library.notification.application.port.in.GetNotificationsQuery;
import com.productdock.library.notification.application.port.in.ReadNotificationsUseCase;
import com.productdock.library.notification.domain.Notification;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("api/notifications")
@Slf4j
@AllArgsConstructor
public class NotificationsApi {

private GetNotificationsQuery getNotificationsQuery;
private ReadNotificationsUseCase readNotificationsUseCase;

public static final String CLAIM_EMAIL = "email";

@GetMapping
public List<Notification> getNotifications(Authentication authentication) {
var userId = getUserId(authentication);
return getNotificationsQuery.getNotifications(userId);
}

@GetMapping("/unread")
public int getUnreadCount(Authentication authentication){
var userId = getUserId(authentication);
return getNotificationsQuery.getUnreadNotificationsCount(userId);
}

@PostMapping("/read")
public void readNotifications(Authentication authentication) {
var userId = getUserId(authentication);
readNotificationsUseCase.markAsReadNotifications(userId);
}

private String getUserId(Authentication authentication) {
return ((Jwt) authentication.getCredentials()).getClaim(CLAIM_EMAIL).toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.productdock.library.notification.adapter.out.mongo;

import com.productdock.library.notification.adapter.out.mongo.enitity.NotificationEntity;
import com.productdock.library.notification.adapter.out.mongo.mapper.NotificationEntityMapper;
import com.productdock.library.notification.application.port.out.persistence.NotificationPersistenceOutPort;
import com.productdock.library.notification.domain.Notification;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
@Slf4j
@AllArgsConstructor
public class NotificationPersistenceAdapter implements NotificationPersistenceOutPort {

private NotificationRepository notificationRepository;
private NotificationEntityMapper mapper;
private MongoTemplate mongoTemplate;

@Override
public List<Notification> getAllByUserId(String userId) {
return notificationRepository.findAllByUserId(userId)
.stream().map(notificationEntity -> mapper.toDomain(notificationEntity)).toList();
}

@Override
public List<Notification> getUnreadByUserId(String userId) {
return notificationRepository.findAllByReadAndUserId(false, userId)
.stream().map(notificationEntity -> mapper.toDomain(notificationEntity)).toList();
}

@Override
public void save(Notification notification) {
log.info("Notification for saving: {}", notification);
var saved = notificationRepository.save(mapper.toEntity(notification));
List<NotificationEntity> savedNotification = notificationRepository.findAllByUserId(notification.getUserId());
log.info("Saved notification: {}", savedNotification);
}

@Override
public void saveAll(List<Notification> notifications) {
notificationRepository.saveAll(notifications.stream().map(notification -> mapper.toEntity(notification)).toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.productdock.library.notification.adapter.out.mongo;

import com.productdock.library.notification.adapter.out.mongo.enitity.NotificationEntity;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
public interface NotificationRepository extends MongoRepository<NotificationEntity, String> {

List<NotificationEntity> findAllByUserId(String userId);

List<NotificationEntity> findAllByReadAndUserId(boolean read, String userId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.productdock.library.notification.adapter.out.mongo.enitity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ActionEntity implements Serializable {
private String type;
private String target;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.productdock.library.notification.adapter.out.mongo.enitity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.io.Serializable;

@Document("notifications")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class NotificationEntity implements Serializable {
@Id
private String id;
private String title;
private String description;
private String userId;
private boolean read;
private String createdDate;
private ActionEntity action;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.productdock.library.notification.adapter.out.mongo.mapper;

import com.productdock.library.notification.adapter.out.mongo.enitity.NotificationEntity;
import com.productdock.library.notification.domain.Notification;
import org.mapstruct.Mapper;
import org.mapstruct.ReportingPolicy;

import java.time.OffsetDateTime;

@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE, componentModel = "spring")
public interface NotificationEntityMapper {
NotificationEntity toEntity(Notification source);
Notification toDomain(NotificationEntity source);

default String map(OffsetDateTime value){
return value.toString();
}

default OffsetDateTime map(String value) {
return OffsetDateTime.parse(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.productdock.library.notification.application.port.in;

import com.productdock.library.notification.domain.Notification;

public interface AddNotificationUseCase {

void saveNotification(Notification notification);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.productdock.library.notification.application.port.in;

import com.productdock.library.notification.domain.Notification;

import java.util.List;

public interface GetNotificationsQuery {

List<Notification> getNotifications(String userId);

int getUnreadNotificationsCount(String userId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.productdock.library.notification.application.port.in;

public interface ReadNotificationsUseCase {

void markAsReadNotifications(String userId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.productdock.library.notification.application.port.out.persistence;

import com.productdock.library.notification.domain.Notification;

import java.util.List;

public interface NotificationPersistenceOutPort {

List<Notification> getAllByUserId(String userId);

List<Notification> getUnreadByUserId(String userId);

void save(Notification notification);

void saveAll(List<Notification> notifications);
}
Loading
Loading