Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TestKit backend: except txMeta as Cypher types #1349

Merged
merged 4 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend.messages.requests;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.CustomDriverError;
import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer;
import org.neo4j.driver.TransactionConfig;

@Setter
@Getter
abstract class AbstractTestkitRequestWithTransactionConfig<
T extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody>
implements TestkitRequest {
protected T data;

protected TransactionConfig buildTxConfig() {
return configureTx(TransactionConfig.builder()).build();
}

private TransactionConfig.Builder configureTx(TransactionConfig.Builder builder) {
return configureTxMetadata(configureTxTimeout(builder));
}

private TransactionConfig.Builder configureTxMetadata(TransactionConfig.Builder builder) {
data.getTxMeta().ifPresent(builder::withMetadata);
return builder;
}

private TransactionConfig.Builder configureTxTimeout(TransactionConfig.Builder builder) {
try {
data.getTimeout().ifPresent((timeout) -> builder.withTimeout(Duration.ofMillis(timeout)));
} catch (IllegalArgumentException e) {
throw new CustomDriverError(e);
}
return builder;
}

@Setter
abstract static class TransactionConfigBody {
protected Integer timeout;

@JsonDeserialize(using = TestkitCypherParamDeserializer.class)
protected Map<String, Object> txMeta;

Optional<Integer> getTimeout() {
return Optional.ofNullable(timeout);
}

Optional<Map<String, Object>> getTxMeta() {
return Optional.ofNullable(txMeta);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@

import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.CustomDriverError;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.holder.AsyncTransactionHolder;
import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder;
Expand All @@ -43,35 +39,14 @@
import org.neo4j.driver.reactive.RxSession;
import reactor.core.publisher.Mono;

@Setter
@Getter
public class SessionBeginTransaction implements TestkitRequest {
private SessionBeginTransactionBody data;

private void configureTimeout(TransactionConfig.Builder builder) {
if (data.getTimeoutPresent()) {
try {
if (data.getTimeout() != null) {
builder.withTimeout(Duration.ofMillis(data.getTimeout()));
} else {
builder.withDefaultTimeout();
}
} catch (IllegalArgumentException e) {
throw new CustomDriverError(e);
}
}
}

public class SessionBeginTransaction
extends AbstractTestkitRequestWithTransactionConfig<SessionBeginTransaction.SessionBeginTransactionBody> {
@Override
public TestkitResponse process(TestkitState testkitState) {
SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId());
Session session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

org.neo4j.driver.Transaction transaction = session.beginTransaction(builder.build());
org.neo4j.driver.Transaction transaction = session.beginTransaction(buildTxConfig());
return transaction(testkitState.addTransactionHolder(new TransactionHolder(sessionHolder, transaction)));
}

Expand All @@ -80,11 +55,8 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
return testkitState.getAsyncSessionHolder(data.getSessionId()).thenCompose(sessionHolder -> {
AsyncSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return session.beginTransactionAsync(builder.build())
return session.beginTransactionAsync(buildTxConfig())
.thenApply(tx -> transaction(
testkitState.addAsyncTransactionHolder(new AsyncTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -96,11 +68,8 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
return testkitState.getRxSessionHolder(data.getSessionId()).flatMap(sessionHolder -> {
RxSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(session.beginTransaction(builder.build()))
return Mono.fromDirect(session.beginTransaction(buildTxConfig()))
.map(tx -> transaction(
testkitState.addRxTransactionHolder(new RxTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -111,11 +80,8 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
return testkitState.getReactiveSessionHolder(data.getSessionId()).flatMap(sessionHolder -> {
ReactiveSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(builder.build())))
return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(buildTxConfig())))
.map(tx -> transaction(testkitState.addReactiveTransactionHolder(
new ReactiveTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -126,11 +92,8 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> {
var session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(session.beginTransaction(builder.build()))
return Mono.fromDirect(session.beginTransaction(buildTxConfig()))
.map(tx -> transaction(testkitState.addReactiveTransactionStreamsHolder(
new ReactiveTransactionStreamsHolder(sessionHolder, tx))));
});
Expand All @@ -144,15 +107,8 @@ private Transaction transaction(String txId) {

@Getter
@Setter
public static class SessionBeginTransactionBody {
public static class SessionBeginTransactionBody
extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody {
private String sessionId;
private Map<String, Object> txMeta;
private Integer timeout;
private Boolean timeoutPresent = false;

public void setTimeout(Integer timeout) {
this.timeout = timeout;
timeoutPresent = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

@Setter
@Getter
public class SessionReadTransaction implements TestkitRequest {
private SessionReadTransactionBody data;

public class SessionReadTransaction
extends AbstractTestkitRequestWithTransactionConfig<SessionReadTransaction.SessionReadTransactionBody> {
@Override
@SuppressWarnings("deprecation")
public TestkitResponse process(TestkitState testkitState) {
SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId());
Session session = sessionHolder.getSession();
session.readTransaction(handle(testkitState, sessionHolder));
session.readTransaction(handle(testkitState, sessionHolder), buildTxConfig());
return retryableDone();
}

Expand All @@ -78,7 +75,7 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
return txWorkFuture;
};

return session.readTransactionAsync(workWrapper);
return session.readTransactionAsync(workWrapper, buildTxConfig());
})
.thenApply(nothing -> retryableDone());
}
Expand All @@ -97,7 +94,7 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
return Mono.fromCompletionStage(tryResult);
};

return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper));
return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper, buildTxConfig()));
})
.then(Mono.just(retryableDone()));
}
Expand All @@ -117,7 +114,7 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
};

return Mono.fromDirect(
flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper)));
flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper, buildTxConfig())));
})
.then(Mono.just(retryableDone()));
}
Expand All @@ -137,7 +134,7 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
return Mono.fromCompletionStage(tryResult);
};

return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper));
return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper, buildTxConfig()));
})
.then(Mono.just(retryableDone()));
}
Expand Down Expand Up @@ -177,7 +174,8 @@ private RetryableDone retryableDone() {

@Setter
@Getter
public static class SessionReadTransactionBody {
public static class SessionReadTransactionBody
extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody {
private String sessionId;
}
}
Loading