diff --git a/application/src/main/java/run/halo/app/content/comment/CommentServiceImpl.java b/application/src/main/java/run/halo/app/content/comment/CommentServiceImpl.java index c1587c868b..217eb57344 100644 --- a/application/src/main/java/run/halo/app/content/comment/CommentServiceImpl.java +++ b/application/src/main/java/run/halo/app/content/comment/CommentServiceImpl.java @@ -4,10 +4,12 @@ import static run.halo.app.extension.index.query.QueryFactory.equal; import static run.halo.app.extension.index.query.QueryFactory.isNull; +import java.time.Duration; import java.time.Instant; import java.util.Set; import java.util.function.Function; import org.apache.commons.lang3.BooleanUtils; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Sort; import org.springframework.lang.NonNull; import org.springframework.security.core.context.ReactiveSecurityContextHolder; @@ -15,6 +17,7 @@ import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import run.halo.app.core.extension.User; import run.halo.app.core.extension.content.Comment; import run.halo.app.core.extension.service.RoleService; @@ -147,17 +150,33 @@ public Mono create(Comment comment) { @Override public Mono removeBySubject(@NonNull Ref subjectRef) { Assert.notNull(subjectRef, "The subjectRef must not be null."); + return cleanupComments(subjectRef, 200); + } + + private Mono cleanupComments(Ref subjectRef, int batchSize) { // ascending order by creation time and name - var pageRequest = PageRequestImpl.of(1, 200, + final var pageRequest = PageRequestImpl.of(1, batchSize, Sort.by("metadata.creationTimestamp", "metadata.name")); - return Flux.defer(() -> listCommentsByRef(subjectRef, pageRequest)) - .expand(page -> page.hasNext() - ? listCommentsByRef(subjectRef, pageRequest.next()) - : Mono.empty() + // forever loop first page until no more to delete + return listCommentsByRef(subjectRef, pageRequest) + .flatMap(page -> Flux.fromIterable(page.getItems()) + .flatMap(this::deleteWithRetry) + .then(page.hasNext() ? cleanupComments(subjectRef, batchSize) : Mono.empty()) + ); + } + + private Mono deleteWithRetry(Comment item) { + return client.delete(item) + .onErrorResume(OptimisticLockingFailureException.class, + e -> attemptToDelete(item.getMetadata().getName())); + } + + private Mono attemptToDelete(String name) { + return Mono.defer(() -> client.fetch(Comment.class, name) + .flatMap(client::delete) ) - .flatMap(page -> Flux.fromIterable(page.getItems())) - .flatMap(client::delete) - .then(); + .retryWhen(Retry.backoff(8, Duration.ofMillis(100)) + .filter(OptimisticLockingFailureException.class::isInstance)); } Mono> listCommentsByRef(Ref subjectRef, PageRequest pageRequest) { diff --git a/application/src/main/java/run/halo/app/content/comment/ReplyServiceImpl.java b/application/src/main/java/run/halo/app/content/comment/ReplyServiceImpl.java index 65f6ee14df..cb86677d02 100644 --- a/application/src/main/java/run/halo/app/content/comment/ReplyServiceImpl.java +++ b/application/src/main/java/run/halo/app/content/comment/ReplyServiceImpl.java @@ -5,18 +5,21 @@ import static run.halo.app.extension.index.query.QueryFactory.isNull; import static run.halo.app.extension.router.selector.SelectorUtil.labelAndFieldSelectorToPredicate; +import java.time.Duration; import java.time.Instant; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.BooleanUtils; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.data.domain.Sort; import org.springframework.security.core.context.ReactiveSecurityContextHolder; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import run.halo.app.core.extension.User; import run.halo.app.core.extension.content.Comment; import run.halo.app.core.extension.content.Reply; @@ -118,17 +121,33 @@ public Mono> list(ReplyQuery query) { @Override public Mono removeAllByComment(String commentName) { Assert.notNull(commentName, "The commentName must not be null."); + return cleanupComments(commentName, 200); + } + + private Mono cleanupComments(String commentName, int batchSize) { // ascending order by creation time and name - var pageRequest = PageRequestImpl.of(1, 200, + final var pageRequest = PageRequestImpl.of(1, batchSize, Sort.by("metadata.creationTimestamp", "metadata.name")); - return Flux.defer(() -> listRepliesByComment(commentName, pageRequest)) - .expand(page -> page.hasNext() - ? listRepliesByComment(commentName, pageRequest.next()) - : Mono.empty() + // forever loop first page until no more to delete + return listRepliesByComment(commentName, pageRequest) + .flatMap(page -> Flux.fromIterable(page.getItems()) + .flatMap(this::deleteWithRetry) + .then(page.hasNext() ? cleanupComments(commentName, batchSize) : Mono.empty()) + ); + } + + private Mono deleteWithRetry(Reply item) { + return client.delete(item) + .onErrorResume(OptimisticLockingFailureException.class, + e -> attemptToDelete(item.getMetadata().getName())); + } + + private Mono attemptToDelete(String name) { + return Mono.defer(() -> client.fetch(Reply.class, name) + .flatMap(client::delete) ) - .flatMap(page -> Flux.fromIterable(page.getItems())) - .flatMap(client::delete) - .then(); + .retryWhen(Retry.backoff(8, Duration.ofMillis(100)) + .filter(OptimisticLockingFailureException.class::isInstance)); } Mono> listRepliesByComment(String commentName, PageRequest pageRequest) { diff --git a/application/src/test/java/run/halo/app/content/comment/CommentServiceImplIntegrationTest.java b/application/src/test/java/run/halo/app/content/comment/CommentServiceImplIntegrationTest.java new file mode 100644 index 0000000000..0b16005515 --- /dev/null +++ b/application/src/test/java/run/halo/app/content/comment/CommentServiceImplIntegrationTest.java @@ -0,0 +1,159 @@ +package run.halo.app.content.comment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.annotation.DirtiesContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import run.halo.app.core.extension.content.Comment; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionStoreUtil; +import run.halo.app.extension.GroupVersionKind; +import run.halo.app.extension.PageRequestImpl; +import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.Ref; +import run.halo.app.extension.SchemeManager; +import run.halo.app.extension.index.IndexerFactory; +import run.halo.app.extension.store.ReactiveExtensionStoreClient; +import run.halo.app.infra.utils.JsonUtils; + +/** + * Integration tests for {@link CommentServiceImpl}. + * + * @author guqing + * @since 2.15.0 + */ +class CommentServiceImplIntegrationTest { + + @Nested + @DirtiesContext + @SpringBootTest + class CommentRemoveTest { + private final List storedComments = createComments(350); + + @Autowired + private SchemeManager schemeManager; + + @SpyBean + private ReactiveExtensionClient reactiveClient; + + @Autowired + private ReactiveExtensionStoreClient storeClient; + + @Autowired + private IndexerFactory indexerFactory; + + @SpyBean + private CommentServiceImpl commentService; + + Mono deleteImmediately(Extension extension) { + var name = extension.getMetadata().getName(); + var scheme = schemeManager.get(extension.getClass()); + // un-index + var indexer = indexerFactory.getIndexer(extension.groupVersionKind()); + indexer.unIndexRecord(extension.getMetadata().getName()); + + // delete from db + var storeName = ExtensionStoreUtil.buildStoreName(scheme, name); + return storeClient.delete(storeName, extension.getMetadata().getVersion()) + .thenReturn(extension); + } + + @BeforeEach + void setUp() { + Flux.fromIterable(storedComments) + .flatMap(post -> reactiveClient.create(post)) + .as(StepVerifier::create) + .expectNextCount(storedComments.size()) + .verifyComplete(); + } + + @AfterEach + void tearDown() { + Flux.fromIterable(storedComments) + .flatMap(this::deleteImmediately) + .as(StepVerifier::create) + .expectNextCount(storedComments.size()) + .verifyComplete(); + } + + @Test + void commentBatchDeletionTest() { + Ref ref = Ref.of("67", + GroupVersionKind.fromAPIVersionAndKind("content.halo.run/v1alpha1", "SinglePage")); + commentService.removeBySubject(ref) + .as(StepVerifier::create) + .verifyComplete(); + + verify(reactiveClient, times(storedComments.size())).delete(any(Comment.class)); + verify(commentService, times(2)).listCommentsByRef(eq(ref), any()); + + commentService.listCommentsByRef(ref, PageRequestImpl.ofSize(1)) + .as(StepVerifier::create) + .consumeNextWith(result -> { + assertThat(result.getTotal()).isEqualTo(0); + }) + .verifyComplete(); + } + + List createComments(int size) { + List comments = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + var comment = createComment(); + comment.getMetadata().setName("comment-" + i); + comments.add(comment); + } + return comments; + } + } + + Comment createComment() { + return JsonUtils.jsonToObject(""" + { + "spec": { + "raw": "fake-raw", + "content": "fake-content", + "owner": { + "kind": "User", + "name": "fake-user" + }, + "userAgent": "", + "ipAddress": "", + "approvedTime": "2024-02-28T09:15:16.095Z", + "creationTime": "2024-02-28T06:23:42.923294424Z", + "priority": 0, + "top": false, + "allowNotification": false, + "approved": true, + "hidden": false, + "subjectRef": { + "group": "content.halo.run", + "version": "v1alpha1", + "kind": "SinglePage", + "name": "67" + }, + "lastReadTime": "2024-02-29T03:39:04.230Z" + }, + "apiVersion": "content.halo.run/v1alpha1", + "kind": "Comment", + "metadata": { + "generateName": "comment-" + } + } + """, Comment.class); + } +} \ No newline at end of file diff --git a/application/src/test/java/run/halo/app/content/comment/ReplyServiceImplIntegrationTest.java b/application/src/test/java/run/halo/app/content/comment/ReplyServiceImplIntegrationTest.java new file mode 100644 index 0000000000..c2365bdf7a --- /dev/null +++ b/application/src/test/java/run/halo/app/content/comment/ReplyServiceImplIntegrationTest.java @@ -0,0 +1,152 @@ +package run.halo.app.content.comment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.annotation.DirtiesContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import run.halo.app.core.extension.content.Reply; +import run.halo.app.extension.Extension; +import run.halo.app.extension.ExtensionStoreUtil; +import run.halo.app.extension.PageRequestImpl; +import run.halo.app.extension.ReactiveExtensionClient; +import run.halo.app.extension.SchemeManager; +import run.halo.app.extension.index.IndexerFactory; +import run.halo.app.extension.store.ReactiveExtensionStoreClient; +import run.halo.app.infra.utils.JsonUtils; + +/** + * Integration tests for {@link ReplyServiceImpl}. + * + * @author guqing + * @since 2.15.0 + */ +class ReplyServiceImplIntegrationTest { + + @Nested + @DirtiesContext + @SpringBootTest + class ReplyRemoveTest { + private final List storedReplies = createReplies(320); + + private List createReplies(int size) { + List replies = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + var reply = JsonUtils.jsonToObject(fakeReplyJson(), Reply.class); + reply.getMetadata().setName("reply-" + i); + replies.add(reply); + } + return replies; + } + + @Autowired + private SchemeManager schemeManager; + + @SpyBean + private ReactiveExtensionClient reactiveClient; + + @Autowired + private ReactiveExtensionStoreClient storeClient; + + @Autowired + private IndexerFactory indexerFactory; + + @SpyBean + private ReplyServiceImpl replyService; + + Mono deleteImmediately(Extension extension) { + var name = extension.getMetadata().getName(); + var scheme = schemeManager.get(extension.getClass()); + // un-index + var indexer = indexerFactory.getIndexer(extension.groupVersionKind()); + indexer.unIndexRecord(extension.getMetadata().getName()); + + // delete from db + var storeName = ExtensionStoreUtil.buildStoreName(scheme, name); + return storeClient.delete(storeName, extension.getMetadata().getVersion()) + .thenReturn(extension); + } + + @BeforeEach + void setUp() { + Flux.fromIterable(storedReplies) + .flatMap(post -> reactiveClient.create(post)) + .as(StepVerifier::create) + .expectNextCount(storedReplies.size()) + .verifyComplete(); + } + + @AfterEach + void tearDown() { + Flux.fromIterable(storedReplies) + .flatMap(this::deleteImmediately) + .as(StepVerifier::create) + .expectNextCount(storedReplies.size()) + .verifyComplete(); + } + + @Test + void removeAllByComment() { + String commentName = "fake-comment"; + replyService.removeAllByComment(commentName) + .as(StepVerifier::create) + .verifyComplete(); + + verify(reactiveClient, times(storedReplies.size())).delete(any(Reply.class)); + verify(replyService, times(2)).listRepliesByComment(eq(commentName), any()); + + replyService.listRepliesByComment(commentName, PageRequestImpl.ofSize(1)) + .as(StepVerifier::create) + .consumeNextWith(result -> assertThat(result.getTotal()).isEqualTo(0)) + .verifyComplete(); + } + } + + String fakeReplyJson() { + return """ + { + "metadata":{ + "name":"fake-reply" + }, + "spec":{ + "raw":"fake-raw", + "content":"fake-content", + "owner":{ + "kind":"User", + "name":"fake-user", + "displayName":"fake-display-name" + }, + "creationTime": "2024-03-11T06:23:42.923294424Z", + "ipAddress":"", + "approved": true, + "hidden": false, + "allowNotification": false, + "top": false, + "priority": 0, + "commentName":"fake-comment" + }, + "owner":{ + "kind":"User", + "displayName":"fake-display-name" + }, + "stats":{ + "upvote":0 + } + } + """; + } +} \ No newline at end of file