Skip to content

Commit

Permalink
feat/#294 : Notification(알림)을 SSE로 보내기 기능
Browse files Browse the repository at this point in the history
  • Loading branch information
LJH098 committed Jun 5, 2024
1 parent dbd08ff commit bd7a71e
Show file tree
Hide file tree
Showing 14 changed files with 423 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.in;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.gaebaljip.exceed.common.annotation.AuthenticationMemberId;
import com.gaebaljip.exceed.infrastructure.sse.application.port.in.ConnectEmitterUseCase;

import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
@SecurityRequirement(name = "access-token")
@Tag(name = "[SSE connection]")
public class EmitterController {
private final ConnectEmitterUseCase connectEmitterUseCase;

@GetMapping(value = "/emitter/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "")
String lastEventId,
@Parameter(hidden = true) @AuthenticationMemberId Long memberId) {
return connectEmitterUseCase.connect(memberId.toString(), lastEventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import java.util.Map;

import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Repository
public interface EmitterRepository {
SseEmitter save(String emitterId, SseEmitter sseEmitter);

void saveEventCache(String eventCachedId, Object event);

Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);

Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);

void deleteById(String emitterId);

void deleteAllEmitterStartWithMemberId(String memberId);

void deleteAllEventCacheStartWithMemberId(String memberId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCaches = new ConcurrentHashMap<>();

@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}

@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCaches.put(eventCacheId, event);
}

@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCaches.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public void deleteById(String emitterId) {
emitters.remove(emitterId);
}

@Override
public void deleteAllEmitterStartWithMemberId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
});
}

@Override
public void deleteAllEventCacheStartWithMemberId(String memberId) {
eventCaches.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCaches.remove(key);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

public enum NotificationType {
// 알람
NOTICE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import lombok.*;

@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder(toBuilder = true)
public class Notify {
private String content;
private String url;
private String type;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class NotifyConverter {
public Notify toNotify(NotifyEntity notifyEntity) {
return Notify.builder()
.content(notifyEntity.getContent())
.url(notifyEntity.getUrl())
.type(notifyEntity.getType().name())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import javax.persistence.*;

import com.gaebaljip.exceed.common.BaseEntity;
import com.gaebaljip.exceed.member.adapter.out.persistence.MemberEntity;

import lombok.*;

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@ToString
@Entity
@Table(name = NotifyEntity.ENTITY_PREFIX + "_TB")
@Builder()
public class NotifyEntity extends BaseEntity {
public static final String ENTITY_PREFIX = "NOTIFY";

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = ENTITY_PREFIX + "_PK", nullable = false)
private Long id;

@Column(name = ENTITY_PREFIX + "_URL", nullable = false)
private String url;

@Column(name = ENTITY_PREFIX + "_CONTENT", nullable = false)
private String content;

@Column(name = ENTITY_PREFIX + "_IS_READ", nullable = false)
private Boolean isRead;

@Column(name = ENTITY_PREFIX + "_TYPE", nullable = false)
@Enumerated(EnumType.STRING)
private NotificationType type;

@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "MEMBER_FK")
private MemberEntity receiver;

public static NotifyEntity createNotify(
MemberEntity receiver, String content, String url, NotificationType type) {
return NotifyEntity.builder()
.receiver(receiver)
.content(content)
.url(url)
.isRead(false)
.type(type)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import java.util.List;

import org.springframework.stereotype.Component;

import com.gaebaljip.exceed.infrastructure.sse.application.port.out.NotifyPort;
import com.gaebaljip.exceed.member.adapter.out.persistence.MemberEntity;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class NotifyPersistenceAdapter implements NotifyPort {
private final NotifyRepository notifyRepository;

@Override
public List<NotifyEntity> findByMemberEntity(MemberEntity memberEntity) {
return notifyRepository.findByMemberEntity(memberEntity);
}

@Override
public void deleteByAllByIdInQuery(List<Long> ids) {
notifyRepository.deleteByAllByIdInQuery(ids);
}

@Override
public NotifyEntity command(NotifyEntity notifyEntity) {
return notifyRepository.save(notifyEntity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.gaebaljip.exceed.infrastructure.sse.adapter.out;

import java.util.List;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;

import com.gaebaljip.exceed.member.adapter.out.persistence.MemberEntity;

public interface NotifyRepository extends JpaRepository<NotifyEntity, Long> {
List<NotifyEntity> findByMemberEntity(MemberEntity memberEntity);

@Query("delete from NotifyEntity n where n.id in :ids")
@Modifying
@Transactional
void deleteByAllByIdInQuery(List<Long> ids);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.gaebaljip.exceed.infrastructure.sse.application;

import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.gaebaljip.exceed.infrastructure.sse.adapter.out.EmitterRepository;
import com.gaebaljip.exceed.infrastructure.sse.adapter.out.NotifyRepository;
import com.gaebaljip.exceed.infrastructure.sse.application.port.in.ConnectEmitterUseCase;
import com.gaebaljip.exceed.infrastructure.sse.application.port.in.SendEmitterUseCase;
import com.gaebaljip.exceed.member.adapter.out.persistence.MemberPersistenceAdapter;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Service
@RequiredArgsConstructor
@Log4j2
public class ConnectEmitterService implements ConnectEmitterUseCase {
private final EmitterRepository emitterRepository;
private final NotifyRepository notifyRepository;
private final MemberPersistenceAdapter memberPersistenceAdapter;
private final SendEmitterUseCase sendEmitterUseCase;
private final Long CONNECT_TIMEOUT = 1000L * 60 * 60;

@Override
public SseEmitter connect(final String memberId, final String lastEventId) {
String eventId = memberId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(eventId, new SseEmitter(CONNECT_TIMEOUT));
// 이벤트를 정상적으로 보냈을 때
emitter.onCompletion(() -> emitterRepository.deleteById(eventId));
// SSE가 timeout 되었을 때
emitter.onTimeout(() -> emitterRepository.deleteById(eventId));
// 503 에러를 방지하기 위한 더미 데이터 전송
sendEmitterUseCase.sendNotification(
emitter, eventId, eventId, "EventStream Created. [memberId=" + memberId + "]");
// 클라이언트가 놓친 이벤트가 있다면 재전송
if (!lastEventId.isEmpty()) {
sendEmitterUseCase.sendLostData(lastEventId, memberId, eventId, emitter);
}
return emitter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.gaebaljip.exceed.infrastructure.sse.application.port.in;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.gaebaljip.exceed.common.annotation.UseCase;

@UseCase
public interface ConnectEmitterUseCase {
SseEmitter connect(final String memberId, final String lastEventId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.gaebaljip.exceed.infrastructure.sse.application.port.in;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.gaebaljip.exceed.common.annotation.UseCase;
import com.gaebaljip.exceed.infrastructure.sse.adapter.out.NotificationType;
import com.gaebaljip.exceed.member.adapter.out.persistence.MemberEntity;

@UseCase
public interface SendEmitterUseCase {
void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter);

void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data);

void send(MemberEntity receiver, String content, String url, NotificationType type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.gaebaljip.exceed.infrastructure.sse.application.port.out;

import java.util.List;

import com.gaebaljip.exceed.common.annotation.Port;
import com.gaebaljip.exceed.infrastructure.sse.adapter.out.NotifyEntity;
import com.gaebaljip.exceed.member.adapter.out.persistence.MemberEntity;

@Port
public interface NotifyPort {
List<NotifyEntity> findByMemberEntity(MemberEntity memberEntity);

void deleteByAllByIdInQuery(List<Long> ids);

NotifyEntity command(NotifyEntity notifyEntity);
}
Loading

0 comments on commit bd7a71e

Please sign in to comment.