diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 2bbe08ad0b..c821e4491d 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -26,7 +26,8 @@ import org.springframework.data.domain.Sort; import org.springframework.data.util.Predicates; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -40,7 +41,6 @@ @Slf4j @Component -@RequiredArgsConstructor public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ReactiveExtensionStoreClient client; @@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ConcurrentMap indexBuildingState = new ConcurrentHashMap<>(); + private TransactionalOperator transactionalOperator; + + public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client, + ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper, + IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine, + ReactiveTransactionManager reactiveTransactionManager) { + this.client = client; + this.converter = converter; + this.schemeManager = schemeManager; + this.objectMapper = objectMapper; + this.indexerFactory = indexerFactory; + this.indexedQueryEngine = indexedQueryEngine; + this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager); + } + + /** + * Only for test. + */ + void setTransactionalOperator(TransactionalOperator transactionalOperator) { + this.transactionalOperator = transactionalOperator; + } + @Override public Flux list(Class type, Predicate predicate, Comparator comparator) { @@ -151,7 +173,6 @@ private Mono get(GroupVersionKind gvk, String name) { } @Override - @Transactional public Mono create(E extension) { checkClientWritable(extension); return Mono.just(extension) @@ -185,7 +206,6 @@ && hasText(extension.getMetadata().getGenerateName())) } @Override - @Transactional public Mono update(E extension) { checkClientWritable(extension); // Refactor the atomic reference if we have a better solution. @@ -223,7 +243,6 @@ private Mono getLatest(Extension extension) { } @Override - @Transactional public Mono delete(E extension) { checkClientWritable(extension); // set deletionTimestamp @@ -247,7 +266,8 @@ Mono doCreate(E oldExtension, String name, byte[] data) var indexer = indexerFactory.getIndexer(gvk); return client.create(name, data) .map(created -> converter.convertFrom(type, created)) - .doOnNext(indexer::indexRecord); + .doOnNext(indexer::indexRecord) + .as(transactionalOperator::transactional); }); } @@ -258,7 +278,8 @@ Mono doUpdate(E oldExtension, String name, Long version var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); return client.update(name, version, data) .map(updated -> converter.convertFrom(type, updated)) - .doOnNext(indexer::updateRecord); + .doOnNext(indexer::updateRecord) + .as(transactionalOperator::transactional); }); } diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index dc7cd9528b..c46e37700c 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -36,6 +36,8 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,6 +65,9 @@ class ReactiveExtensionClientTest { @Mock IndexerFactory indexerFactory; + @Mock + ReactiveTransactionManager reactiveTransactionManager; + @Spy ObjectMapper objectMapper = JsonMapper.builder() .addModule(new JavaTimeModule()) @@ -76,6 +81,10 @@ void setUp() { lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); + var transactionalOperator = mock(TransactionalOperator.class); + client.setTransactionalOperator(transactionalOperator); + lenient().when(transactionalOperator.transactional(any(Mono.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); } FakeExtension createFakeExtension(String name, Long version) {