Skip to content

Commit

Permalink
[fix] 매입 이력 알람 리스너 문제 해결 (#438)
Browse files Browse the repository at this point in the history
* refactor: 현재가 갱신 단순 스트림 방식으로 변경

* refactor: 불필요한 메서드 제거

* refactor: toArray 메서드로 분리

* feat: Flux 방식으로 변경

* fix: Mono.error 모킹 해결

* feat: 현재가 조회 부분을 반복문에서 람다로 변경

* refactor: Flux 형태로 현잭가 조회 개선

* style: 코드 정리

* style: 코드 정리

* feat: 컬렉션 필드를 불변 컬렉션으로 반환하도록 변경

PortfolioHolding 클래스의 getPurchaseHistory 메서드 같은 경우 서비스에 문제점이 있어서 todo 남김

* feat: purchaseHistory 컬렉션을 반환시 불변 컬렉션으로 반환

* fix: todo 제거

* feat: 불변 컬렉션 반환으로 인한 코드 제거

* style: 코드 정리

* style: 로깅 제거

* fix: @EnableAsync 애노테이션을 추가하여 비동기 활성화 및 이벤트 리스너의 반환 타입을 CompletableFuture로 변경

* test: PurchaseHistoryServiceTest에 있는 매입 이력 이벤트(추가/삭제)에 따른 알림 이벤트 테스트를 제거하고 리스너 테스트에 최대 손실 알림 테스트 추가

* feat: @eventlistener를 @TransactionalEventListener로 변경
  • Loading branch information
yonghwankim-dev authored Aug 16, 2024
1 parent 8959077 commit 27ec6d8
Show file tree
Hide file tree
Showing 23 changed files with 161 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
Expand All @@ -34,7 +33,7 @@ public List<HolidayDto> read() throws IOException {
.map(parts -> new HolidayDto(
LocalDate.parse(parts[0], formatter),
parts[1]))
.collect(Collectors.toList());
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -238,4 +239,8 @@ public Expression getLastDayClosingPrice(ClosingPriceRepository manager) {
public boolean hasAuthorization(Long memberId) {
return portfolio.hasAuthorization(memberId);
}

public List<PurchaseHistory> getPurchaseHistory() {
return Collections.unmodifiableList(purchaseHistory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@JsonDeserialize(using = KisDividendWrapper.KissDividendWrapperDeserializer.class)
// TODO: KisDividend 클래스의 역직렬화를 수정하여 래퍼 클래스가 없도록 수정
public class KisDividendWrapper {
private List<KisDividend> kisDividends;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class CurrentPriceRedisRepository implements PriceRepository {
private static final String CURRENT_PRICE_FORMAT = "cp:%s";
private final RedisTemplate<String, String> redisTemplate;
private final KisClient kisClient;

@Override
public void savePrice(KisCurrentPrice... currentPrices) {
Arrays.stream(currentPrices).forEach(this::savePrice);
Expand Down
58 changes: 17 additions & 41 deletions src/main/java/codesquad/fineants/domain/kis/service/KisService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import codesquad.fineants.domain.kis.repository.ClosingPriceRepository;
import codesquad.fineants.domain.kis.repository.CurrentPriceRedisRepository;
import codesquad.fineants.domain.kis.repository.HolidayRepository;
import codesquad.fineants.domain.kis.repository.KisAccessTokenRepository;
import codesquad.fineants.domain.notification.event.publisher.PortfolioPublisher;
import codesquad.fineants.domain.stock.domain.dto.response.StockDataResponse;
import codesquad.fineants.domain.stock.domain.entity.Stock;
import codesquad.fineants.domain.stock_target_price.domain.entity.StockTargetPrice;
import codesquad.fineants.domain.stock_target_price.event.publisher.StockTargetPricePublisher;
import codesquad.fineants.domain.stock_target_price.repository.StockTargetPriceRepository;
import codesquad.fineants.global.common.delay.DelayManager;
import codesquad.fineants.global.errors.exception.KisException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,18 +55,18 @@ public class KisService {

private final KisClient kisClient;
private final PortfolioHoldingRepository portFolioHoldingRepository;
private final KisAccessTokenRepository manager;
private final CurrentPriceRedisRepository currentPriceRedisRepository;
private final ClosingPriceRepository closingPriceRepository;
private final HolidayRepository holidayRepository;
private final StockTargetPricePublisher stockTargetPricePublisher;
private final PortfolioPublisher portfolioPublisher;
private final StockTargetPriceRepository stockTargetPriceRepository;
private final DelayManager delayManager;

// 평일 9am ~ 15:59pm 5초마다 현재가 갱신 수행
@Profile(value = "production")
@Scheduled(cron = "0/5 * 9-15 ? * MON,TUE,WED,THU,FRI")
@Transactional(readOnly = true)
@Transactional
public void refreshCurrentPrice() {
// 휴장일인 경우 실행하지 않음
if (holidayRepository.isHoliday(LocalDate.now())) {
Expand All @@ -76,7 +76,7 @@ public void refreshCurrentPrice() {
}

// 회원이 가지고 있는 모든 종목에 대하여 현재가 갱신
@Transactional(readOnly = true)
@Transactional
public List<KisCurrentPrice> refreshAllStockCurrentPrice() {
Set<String> totalTickerSymbol = new HashSet<>();
totalTickerSymbol.addAll(portFolioHoldingRepository.findAllTickerSymbol());
Expand All @@ -85,7 +85,6 @@ public List<KisCurrentPrice> refreshAllStockCurrentPrice() {
.map(Stock::getTickerSymbol)
.collect(Collectors.toSet()));
List<String> totalTickerSymbolList = totalTickerSymbol.stream().toList();

List<KisCurrentPrice> prices = this.refreshStockCurrentPrice(totalTickerSymbolList);
stockTargetPricePublisher.publishEvent(totalTickerSymbolList);
portfolioPublisher.publishCurrentPriceEvent();
Expand All @@ -95,32 +94,23 @@ public List<KisCurrentPrice> refreshAllStockCurrentPrice() {
// 주식 현재가 갱신
@Transactional(readOnly = true)
public List<KisCurrentPrice> refreshStockCurrentPrice(List<String> tickerSymbols) {
List<CompletableFuture<KisCurrentPrice>> futures = tickerSymbols.stream()
.map(this::submitCurrentPriceFuture)
.toList();

List<KisCurrentPrice> prices = futures.stream()
.map(future -> {
try {
return future.get(1L, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error(e.getMessage());
return null;
}
})
.filter(Objects::nonNull)
int concurrency = 20;
List<KisCurrentPrice> prices = Flux.fromIterable(tickerSymbols)
.flatMap(ticker -> this.fetchCurrentPrice(ticker)
.doOnSuccess(kisCurrentPrice -> log.debug("reload stock current price {}", kisCurrentPrice))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, delayManager.fixedDelay())), concurrency)
.delayElements(delayManager.delay())
.collectList()
.blockOptional(delayManager.timeout())
.orElseGet(Collections::emptyList).stream()
.toList();

prices.forEach(currentPriceRedisRepository::savePrice);

currentPriceRedisRepository.savePrice(toArray(prices));
log.info("종목 현재가 {}개중 {}개 갱신", tickerSymbols.size(), prices.size());
return prices;
}

private CompletableFuture<KisCurrentPrice> submitCurrentPriceFuture(String tickerSymbol) {
CompletableFuture<KisCurrentPrice> future = createCompletableFuture();
executorService.schedule(createCurrentPriceRunnable(tickerSymbol, future), 1L, TimeUnit.SECONDS);
return future;
private KisCurrentPrice[] toArray(List<KisCurrentPrice> prices) {
return prices.toArray(KisCurrentPrice[]::new);
}

private <T> CompletableFuture<T> createCompletableFuture() {
Expand All @@ -133,21 +123,8 @@ private <T> CompletableFuture<T> createCompletableFuture() {
return future;
}

private Runnable createCurrentPriceRunnable(String tickerSymbol, CompletableFuture<KisCurrentPrice> future) {
return () -> {
try {
future.complete(fetchCurrentPrice(tickerSymbol)
.blockOptional(Duration.ofMinutes(1L))
.orElseGet(() -> KisCurrentPrice.empty(tickerSymbol))
);
} catch (KisException e) {
future.completeExceptionally(e);
}
};
}

public Mono<KisCurrentPrice> fetchCurrentPrice(String tickerSymbol) {
return kisClient.fetchCurrentPrice(tickerSymbol);
return Mono.defer(() -> kisClient.fetchCurrentPrice(tickerSymbol));
}

// 15시 30분에 종가 갱신 수행
Expand Down Expand Up @@ -251,7 +228,6 @@ public Mono<KisSearchStockInfo> fetchSearchStockInfo(String tickerSymbol) {
* 하루전부터 오늘까지의 상장된 종목들의 정보를 조회한다.
* @return 종목 정보 리스트
*/

public Set<StockDataResponse.StockIntegrationInfo> fetchStockInfoInRangedIpo() {
LocalDate today = LocalDate.now();
LocalDate yesterday = today.minusDays(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -140,4 +141,8 @@ public Map<String, Object> toMemberAttributeMap() {
.collect(Collectors.toSet()));
return result;
}

public Set<MemberRole> getRoles() {
return Collections.unmodifiableSet(roles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public List<NotificationSaveResponse> saveNotification(List<SentNotifyMessage> m

/**
* 포트폴리오 알림 저장
*
* @param request 알림 데이터
* @return 알림 저장 결과
*/
Expand All @@ -89,6 +90,7 @@ private Member findMember(Long memberId) {

/**
* 모든 회원을 대상으로 목표 수익률을 만족하는 포트폴리오에 대해서 목표 수익률 달성 알림 푸시
*
* @return 알림 전송 결과
*/
@Transactional
Expand All @@ -105,6 +107,7 @@ public NotifyMessageResponse notifyTargetGainAll() {

/**
* 특정 포트폴리오의 목표 수익률 달성 알림 푸시
*
* @param portfolioId 포트폴리오 등록번호
* @return 알림 전송 결과
*/
Expand Down Expand Up @@ -166,6 +169,7 @@ private Map<String, String> getMessageIdMap(List<SentNotifyMessage> sentNotifyMe

/**
* 모든 포트폴리오를 대상으로 최대 손실율에 도달하는 모든 포트폴리오에 대해서 최대 손실율 도달 알림 푸시
*
* @return 알림 전송 결과
*/
@Transactional
Expand All @@ -181,6 +185,7 @@ public NotifyMessageResponse notifyMaxLossAll() {

/**
* 특정 포트폴리오의 최대 손실율 달성 알림 푸시
*
* @param portfolioId 포트폴리오 등록번호
* @return 알림 전송 결과
*/
Expand All @@ -198,6 +203,7 @@ public NotifyMessageResponse notifyMaxLoss(Long portfolioId) {

/**
* 모든 회원을 대상으로 특정 종목들에 대한 종목 지정가 알림 발송
*
* @param tickerSymbols 종목의 티커 심볼 리스트
* @return 알림 전송 결과
*/
Expand All @@ -220,6 +226,7 @@ public NotifyMessageResponse notifyTargetPriceToAllMember(List<String> tickerSym

/**
* 특정 회원을 대상으로 종목 지정가 알림 발송
*
* @param memberId 회원의 등록번호
* @return 알림 전송 결과
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -528,4 +529,8 @@ public NotificationPreference getNotificationPreference() {
public NotifyMessage getTargetPriceMessage(String token) {
throw new UnsupportedOperationException("This method is not supported for Portfolio");
}

public List<PortfolioHolding> getPortfolioHoldings() {
return Collections.unmodifiableList(portfolioHoldings);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package codesquad.fineants.domain.purchasehistory.event.listener;

import org.springframework.context.event.EventListener;
import java.util.concurrent.CompletableFuture;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;

import codesquad.fineants.domain.notification.domain.dto.response.PortfolioNotifyMessagesResponse;
import codesquad.fineants.domain.notification.service.NotificationService;
Expand All @@ -20,21 +22,19 @@ public class PurchaseHistoryEventListener {

// 매입 이력 이벤트가 발생하면 포트폴리오 목표수익률에 달성하면 푸시 알림
@Async
@EventListener
public void notifyTargetGainBy(PushNotificationEvent event) {
@TransactionalEventListener
public CompletableFuture<PortfolioNotifyMessagesResponse> notifyTargetGainBy(PushNotificationEvent event) {
PurchaseHistoryEventSendableParameter parameter = event.getValue();
PortfolioNotifyMessagesResponse response = (PortfolioNotifyMessagesResponse)notificationService.notifyTargetGain(
parameter.getPortfolioId());
log.info("매입 이력 이벤트로 인한 목표 수익률 달성 알림 결과 : {}", response);
return CompletableFuture.supplyAsync(() ->
(PortfolioNotifyMessagesResponse)notificationService.notifyTargetGain(parameter.getPortfolioId()));
}

// 매입 이력 이벤트가 발생하면 포트폴리오 최대손실율에 도달하면 푸시 알림
@Async
@EventListener
public void notifyMaxLoss(PushNotificationEvent event) {
@TransactionalEventListener
public CompletableFuture<PortfolioNotifyMessagesResponse> notifyMaxLoss(PushNotificationEvent event) {
PurchaseHistoryEventSendableParameter parameter = event.getValue();
PortfolioNotifyMessagesResponse response = (PortfolioNotifyMessagesResponse)notificationService.notifyMaxLoss(
parameter.getPortfolioId());
log.info("매입 이력 이벤트로 인한 최대 손실율 달성 알림 결과 : response={}", response);
return CompletableFuture.supplyAsync(() ->
(PortfolioNotifyMessagesResponse)notificationService.notifyMaxLoss(parameter.getPortfolioId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ public PurchaseHistoryDeleteResponse deletePurchaseHistory(Long portfolioHolding
PurchaseHistory deletePurchaseHistory = findPurchaseHistory(purchaseHistoryId);
repository.deleteById(purchaseHistoryId);

// 매입 이력 알람 이벤트를 위한 매입 이력 데이터 삭제
findPortfolio(portfolioId).getPortfolioHoldings().stream()
.filter(holding -> holding.getId().equals(portfolioHoldingId))
.findAny()
.orElseThrow(() -> new FineAntsException(PortfolioHoldingErrorCode.NOT_FOUND_PORTFOLIO_HOLDING))
.getPurchaseHistory().remove(deletePurchaseHistory);

purchaseHistoryEventPublisher.publishPushNotificationEvent(portfolioId, memberId);
return PurchaseHistoryDeleteResponse.from(deletePurchaseHistory, portfolioId, memberId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -237,4 +238,8 @@ public String toCsvLineString() {
market.name(),
sector);
}

public List<StockDividend> getStockDividends() {
return Collections.unmodifiableList(stockDividends);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private Map<Boolean, List<Stock>> fetchPartitionedStocksForDelisted() {
final int concurrency = 20;
return Flux.fromIterable(findAllTickerSymbols())
.flatMap(kisService::fetchSearchStockInfo, concurrency)
.delayElements(delayManager.getDelay())
.delayElements(delayManager.delay())
.collectList()
.blockOptional(TIMEOUT)
.orElseGet(Collections::emptyList).stream()
Expand Down Expand Up @@ -191,7 +191,7 @@ private List<StockDividend> fetchDividends(Set<String> tickerSymbols) {
final int concurrency = 20;
return Flux.fromIterable(tickerSymbols)
.flatMap(ticker -> kisService.fetchDividend(ticker).flatMapMany(Flux::fromIterable), concurrency)
.delayElements(delayManager.getDelay())
.delayElements(delayManager.delay())
.collectList()
.blockOptional(TIMEOUT)
.orElseGet(Collections::emptyList).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import codesquad.fineants.domain.BaseEntity;
Expand Down Expand Up @@ -50,11 +51,6 @@ public class StockTargetPrice extends BaseEntity {
@OneToMany(fetch = FetchType.LAZY, mappedBy = "stockTargetPrice")
private List<TargetPriceNotification> targetPriceNotifications;

private StockTargetPrice(Boolean isActive, Member member, Stock stock,
List<TargetPriceNotification> targetPriceNotifications) {
this(LocalDateTime.now(), null, null, isActive, member, stock, targetPriceNotifications);
}

private StockTargetPrice(LocalDateTime createAt, LocalDateTime modifiedAt, Long id, Boolean isActive, Member member,
Stock stock, List<TargetPriceNotification> targetPriceNotifications) {
super(createAt, modifiedAt);
Expand Down Expand Up @@ -84,4 +80,8 @@ public Expression getCurrentPrice(CurrentPriceRedisRepository manager) {
public boolean hasAuthorization(Long memberId) {
return member.hasAuthorization(memberId);
}

public List<TargetPriceNotification> getTargetPriceNotifications() {
return Collections.unmodifiableList(targetPriceNotifications);
}
}
Loading

0 comments on commit 27ec6d8

Please sign in to comment.