Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat/post-snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
ruibaby committed Apr 26, 2024
2 parents e11076e + c0de807 commit aa862d8
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
import static run.halo.app.extension.index.query.QueryFactory.equal;
import static run.halo.app.extension.index.query.QueryFactory.isNull;

import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.function.Function;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Sort;
import org.springframework.lang.NonNull;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.core.extension.service.RoleService;
Expand Down Expand Up @@ -147,17 +150,33 @@ public Mono<Comment> create(Comment comment) {
@Override
public Mono<Void> removeBySubject(@NonNull Ref subjectRef) {
Assert.notNull(subjectRef, "The subjectRef must not be null.");
return cleanupComments(subjectRef, 200);
}

private Mono<Void> cleanupComments(Ref subjectRef, int batchSize) {
// ascending order by creation time and name
var pageRequest = PageRequestImpl.of(1, 200,
final var pageRequest = PageRequestImpl.of(1, batchSize,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> listCommentsByRef(subjectRef, pageRequest))
.expand(page -> page.hasNext()
? listCommentsByRef(subjectRef, pageRequest.next())
: Mono.empty()
// forever loop first page until no more to delete
return listCommentsByRef(subjectRef, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(this::deleteWithRetry)
.then(page.hasNext() ? cleanupComments(subjectRef, batchSize) : Mono.empty())
);
}

private Mono<Comment> deleteWithRetry(Comment item) {
return client.delete(item)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete(item.getMetadata().getName()));
}

private Mono<Comment> attemptToDelete(String name) {
return Mono.defer(() -> client.fetch(Comment.class, name)
.flatMap(client::delete)
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}

Mono<ListResult<Comment>> listCommentsByRef(Ref subjectRef, PageRequest pageRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
import static run.halo.app.extension.index.query.QueryFactory.isNull;
import static run.halo.app.extension.router.selector.SelectorUtil.labelAndFieldSelectorToPredicate;

import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.domain.Sort;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.User;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.core.extension.content.Reply;
Expand Down Expand Up @@ -118,17 +121,33 @@ public Mono<ListResult<ListedReply>> list(ReplyQuery query) {
@Override
public Mono<Void> removeAllByComment(String commentName) {
Assert.notNull(commentName, "The commentName must not be null.");
return cleanupComments(commentName, 200);
}

private Mono<Void> cleanupComments(String commentName, int batchSize) {
// ascending order by creation time and name
var pageRequest = PageRequestImpl.of(1, 200,
final var pageRequest = PageRequestImpl.of(1, batchSize,
Sort.by("metadata.creationTimestamp", "metadata.name"));
return Flux.defer(() -> listRepliesByComment(commentName, pageRequest))
.expand(page -> page.hasNext()
? listRepliesByComment(commentName, pageRequest.next())
: Mono.empty()
// forever loop first page until no more to delete
return listRepliesByComment(commentName, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(this::deleteWithRetry)
.then(page.hasNext() ? cleanupComments(commentName, batchSize) : Mono.empty())
);
}

private Mono<Reply> deleteWithRetry(Reply item) {
return client.delete(item)
.onErrorResume(OptimisticLockingFailureException.class,
e -> attemptToDelete(item.getMetadata().getName()));
}

private Mono<Reply> attemptToDelete(String name) {
return Mono.defer(() -> client.fetch(Reply.class, name)
.flatMap(client::delete)
)
.flatMap(page -> Flux.fromIterable(page.getItems()))
.flatMap(client::delete)
.then();
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}

Mono<ListResult<Reply>> listRepliesByComment(String commentName, PageRequest pageRequest) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package run.halo.app.content.comment;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.content.Comment;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ExtensionStoreUtil;
import run.halo.app.extension.GroupVersionKind;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.Ref;
import run.halo.app.extension.SchemeManager;
import run.halo.app.extension.index.IndexerFactory;
import run.halo.app.extension.store.ReactiveExtensionStoreClient;
import run.halo.app.infra.utils.JsonUtils;

/**
* Integration tests for {@link CommentServiceImpl}.
*
* @author guqing
* @since 2.15.0
*/
class CommentServiceImplIntegrationTest {

@Nested
@DirtiesContext
@SpringBootTest
class CommentRemoveTest {
private final List<Comment> storedComments = createComments(350);

@Autowired
private SchemeManager schemeManager;

@SpyBean
private ReactiveExtensionClient reactiveClient;

@Autowired
private ReactiveExtensionStoreClient storeClient;

@Autowired
private IndexerFactory indexerFactory;

@SpyBean
private CommentServiceImpl commentService;

Mono<Extension> deleteImmediately(Extension extension) {
var name = extension.getMetadata().getName();
var scheme = schemeManager.get(extension.getClass());
// un-index
var indexer = indexerFactory.getIndexer(extension.groupVersionKind());
indexer.unIndexRecord(extension.getMetadata().getName());

// delete from db
var storeName = ExtensionStoreUtil.buildStoreName(scheme, name);
return storeClient.delete(storeName, extension.getMetadata().getVersion())
.thenReturn(extension);
}

@BeforeEach
void setUp() {
Flux.fromIterable(storedComments)
.flatMap(post -> reactiveClient.create(post))
.as(StepVerifier::create)
.expectNextCount(storedComments.size())
.verifyComplete();
}

@AfterEach
void tearDown() {
Flux.fromIterable(storedComments)
.flatMap(this::deleteImmediately)
.as(StepVerifier::create)
.expectNextCount(storedComments.size())
.verifyComplete();
}

@Test
void commentBatchDeletionTest() {
Ref ref = Ref.of("67",
GroupVersionKind.fromAPIVersionAndKind("content.halo.run/v1alpha1", "SinglePage"));
commentService.removeBySubject(ref)
.as(StepVerifier::create)
.verifyComplete();

verify(reactiveClient, times(storedComments.size())).delete(any(Comment.class));
verify(commentService, times(2)).listCommentsByRef(eq(ref), any());

commentService.listCommentsByRef(ref, PageRequestImpl.ofSize(1))
.as(StepVerifier::create)
.consumeNextWith(result -> {
assertThat(result.getTotal()).isEqualTo(0);
})
.verifyComplete();
}

List<Comment> createComments(int size) {
List<Comment> comments = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
var comment = createComment();
comment.getMetadata().setName("comment-" + i);
comments.add(comment);
}
return comments;
}
}

Comment createComment() {
return JsonUtils.jsonToObject("""
{
"spec": {
"raw": "fake-raw",
"content": "fake-content",
"owner": {
"kind": "User",
"name": "fake-user"
},
"userAgent": "",
"ipAddress": "",
"approvedTime": "2024-02-28T09:15:16.095Z",
"creationTime": "2024-02-28T06:23:42.923294424Z",
"priority": 0,
"top": false,
"allowNotification": false,
"approved": true,
"hidden": false,
"subjectRef": {
"group": "content.halo.run",
"version": "v1alpha1",
"kind": "SinglePage",
"name": "67"
},
"lastReadTime": "2024-02-29T03:39:04.230Z"
},
"apiVersion": "content.halo.run/v1alpha1",
"kind": "Comment",
"metadata": {
"generateName": "comment-"
}
}
""", Comment.class);
}
}
Loading

0 comments on commit aa862d8

Please sign in to comment.