Skip to content

Commit

Permalink
[release-2.12] fix: failure in data query due to reconciler triggered…
Browse files Browse the repository at this point in the history
… by uncommitted transaction (#5329)



Co-authored-by: guqing <i@guqing.email>
  • Loading branch information
halo-dev-bot and guqing authored Feb 5, 2024
1 parent c71b2ec commit e18db4d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.springframework.data.domain.Sort;
import org.springframework.data.util.Predicates;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
Expand All @@ -40,7 +41,6 @@

@Slf4j
@Component
@RequiredArgsConstructor
public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {

private final ReactiveExtensionStoreClient client;
Expand All @@ -60,6 +60,28 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private final ConcurrentMap<GroupKind, AtomicBoolean> indexBuildingState =
new ConcurrentHashMap<>();

private TransactionalOperator transactionalOperator;

public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client,
ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper,
IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine,
ReactiveTransactionManager reactiveTransactionManager) {
this.client = client;
this.converter = converter;
this.schemeManager = schemeManager;
this.objectMapper = objectMapper;
this.indexerFactory = indexerFactory;
this.indexedQueryEngine = indexedQueryEngine;
this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager);
}

/**
* Only for test.
*/
void setTransactionalOperator(TransactionalOperator transactionalOperator) {
this.transactionalOperator = transactionalOperator;
}

@Override
public <E extends Extension> Flux<E> list(Class<E> type, Predicate<E> predicate,
Comparator<E> comparator) {
Expand Down Expand Up @@ -151,7 +173,6 @@ private Mono<Unstructured> get(GroupVersionKind gvk, String name) {
}

@Override
@Transactional
public <E extends Extension> Mono<E> create(E extension) {
checkClientWritable(extension);
return Mono.just(extension)
Expand Down Expand Up @@ -185,7 +206,6 @@ && hasText(extension.getMetadata().getGenerateName()))
}

@Override
@Transactional
public <E extends Extension> Mono<E> update(E extension) {
checkClientWritable(extension);
// Refactor the atomic reference if we have a better solution.
Expand Down Expand Up @@ -223,7 +243,6 @@ private Mono<? extends Extension> getLatest(Extension extension) {
}

@Override
@Transactional
public <E extends Extension> Mono<E> delete(E extension) {
checkClientWritable(extension);
// set deletionTimestamp
Expand All @@ -247,7 +266,8 @@ <E extends Extension> Mono<E> doCreate(E oldExtension, String name, byte[] data)
var indexer = indexerFactory.getIndexer(gvk);
return client.create(name, data)
.map(created -> converter.convertFrom(type, created))
.doOnNext(indexer::indexRecord);
.doOnNext(indexer::indexRecord)
.as(transactionalOperator::transactional);
});
}

Expand All @@ -258,7 +278,8 @@ <E extends Extension> Mono<E> doUpdate(E oldExtension, String name, Long version
var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind());
return client.update(name, version, data)
.map(updated -> converter.convertFrom(type, updated))
.doOnNext(indexer::updateRecord);
.doOnNext(indexer::updateRecord)
.as(transactionalOperator::transactional);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -63,6 +65,9 @@ class ReactiveExtensionClientTest {
@Mock
IndexerFactory indexerFactory;

@Mock
ReactiveTransactionManager reactiveTransactionManager;

@Spy
ObjectMapper objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule())
Expand All @@ -76,6 +81,10 @@ void setUp() {
lenient().when(schemeManager.get(eq(FakeExtension.class)))
.thenReturn(fakeScheme);
lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme);
var transactionalOperator = mock(TransactionalOperator.class);
client.setTransactionalOperator(transactionalOperator);
lenient().when(transactionalOperator.transactional(any(Mono.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
}

FakeExtension createFakeExtension(String name, Long version) {
Expand Down

0 comments on commit e18db4d

Please sign in to comment.