Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 301 to stable/8.0 #312

Merged
merged 8 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CreateProcessInstanceWithResultResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DecisionRequirementsMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployProcessResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.DeployResourceResponse.Builder;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.PublishMessageResponse;
Expand All @@ -42,7 +46,6 @@
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
Expand All @@ -51,29 +54,17 @@

class GrpcResponseWriter implements CommandResponseWriter {

private static long key = -1;
private static final DirectBuffer valueBufferView = new UnsafeBuffer();
private static Intent intent = Intent.UNKNOWN;
final GrpcToLogStreamGateway gateway;
private int partitionId = -1;
private long key = -1;
private Intent intent = Intent.UNKNOWN;
private RecordType recordType = RecordType.NULL_VAL;
private ValueType valueType = ValueType.NULL_VAL;
private RejectionType rejectionType = RejectionType.NULL_VAL;
private String rejectionReason = "";
private final DirectBuffer valueBufferView = new UnsafeBuffer();
private final MutableDirectBuffer valueBuffer = new ExpandableArrayBuffer();

private final Map<ValueType, Callable<GeneratedMessageV3>> responseMap =
Map.of(
ValueType.DEPLOYMENT, this::createDeployResponse,
ValueType.PROCESS_INSTANCE_CREATION, this::createProcessInstanceResponse,
ValueType.PROCESS_INSTANCE_RESULT, this::createProcessInstanceWithResultResponse,
ValueType.PROCESS_INSTANCE, this::createCancelInstanceResponse,
ValueType.INCIDENT, this::createResolveIncidentResponse,
ValueType.VARIABLE_DOCUMENT, this::createSetVariablesResponse,
ValueType.MESSAGE, this::createMessageResponse,
ValueType.JOB_BATCH, this::createJobBatchResponse,
ValueType.JOB, this::createJobResponse);

public GrpcResponseWriter(final GrpcToLogStreamGateway gateway) {
this.gateway = gateway;
}
Expand All @@ -86,13 +77,13 @@ public CommandResponseWriter partitionId(final int partitionId) {

@Override
public CommandResponseWriter key(final long key) {
this.key = key;
GrpcResponseWriter.key = key;
return this;
}

@Override
public CommandResponseWriter intent(final Intent intent) {
this.intent = intent;
GrpcResponseWriter.intent = intent;
return this;
}

Expand Down Expand Up @@ -136,15 +127,15 @@ public boolean tryWriteResponse(final int requestStreamId, final long requestId)
}

try {
final GeneratedMessageV3 response = responseMap.get(valueType).call();
gateway.responseCallback(requestId, response);
gateway.responseCallback(requestId);
return true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private DeployProcessResponse createDeployResponse() {
@Deprecated(since = "8.0.0")
static DeployProcessResponse createDeployResponse() {
final DeploymentRecord deployment = new DeploymentRecord();
deployment.wrap(valueBufferView);

Expand All @@ -164,7 +155,51 @@ private DeployProcessResponse createDeployResponse() {
.build();
}

private GeneratedMessageV3 createProcessInstanceResponse() {
static GeneratedMessageV3 createDeployResourceResponse() {
final DeploymentRecord deployment = new DeploymentRecord();
deployment.wrap(valueBufferView);

final Builder builder = DeployResourceResponse.newBuilder().setKey(key);
deployment.getProcessesMetadata().stream()
.map(
metadata ->
ProcessMetadata.newBuilder()
.setBpmnProcessId(metadata.getBpmnProcessId())
.setVersion(metadata.getVersion())
.setProcessDefinitionKey(metadata.getProcessDefinitionKey())
.setResourceName(metadata.getResourceName())
.build())
.forEach(metadata -> builder.addDeploymentsBuilder().setProcess(metadata));

deployment.decisionsMetadata().stream()
.map(
metadata ->
DecisionMetadata.newBuilder()
.setDmnDecisionId(metadata.getDecisionId())
.setDmnDecisionName(metadata.getDecisionName())
.setVersion(metadata.getVersion())
.setDecisionKey(metadata.getDecisionKey())
.setDmnDecisionRequirementsId(metadata.getDecisionRequirementsId())
.setDecisionRequirementsKey(metadata.getDecisionRequirementsKey())
.build())
.forEach(metadata -> builder.addDeploymentsBuilder().setDecision(metadata));

deployment.decisionRequirementsMetadata().stream()
.map(
metadata ->
DecisionRequirementsMetadata.newBuilder()
.setDmnDecisionRequirementsId(metadata.getDecisionRequirementsId())
.setDmnDecisionRequirementsName(metadata.getDecisionRequirementsName())
.setVersion(metadata.getDecisionRequirementsVersion())
.setDecisionRequirementsKey(metadata.getDecisionRequirementsKey())
.setResourceName(metadata.getResourceName())
.build())
.forEach(metadata -> builder.addDeploymentsBuilder().setDecisionRequirements(metadata));

return builder.build();
}

static GeneratedMessageV3 createProcessInstanceResponse() {
final ProcessInstanceCreationRecord processInstance = new ProcessInstanceCreationRecord();
processInstance.wrap(valueBufferView);

Expand All @@ -176,7 +211,7 @@ private GeneratedMessageV3 createProcessInstanceResponse() {
.build();
}

private GeneratedMessageV3 createProcessInstanceWithResultResponse() {
static GeneratedMessageV3 createProcessInstanceWithResultResponse() {
final ProcessInstanceResultRecord processInstanceResult = new ProcessInstanceResultRecord();
processInstanceResult.wrap(valueBufferView);

Expand All @@ -189,29 +224,29 @@ private GeneratedMessageV3 createProcessInstanceWithResultResponse() {
.build();
}

private GeneratedMessageV3 createCancelInstanceResponse() {
static GeneratedMessageV3 createCancelInstanceResponse() {
return CancelProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createResolveIncidentResponse() {
static GeneratedMessageV3 createResolveIncidentResponse() {
final IncidentRecord incident = new IncidentRecord();
incident.wrap(valueBufferView);

return ResolveIncidentResponse.newBuilder().build();
}

private GeneratedMessageV3 createSetVariablesResponse() {
static GeneratedMessageV3 createSetVariablesResponse() {
final VariableDocumentRecord variableDocumentRecord = new VariableDocumentRecord();
variableDocumentRecord.wrap(valueBufferView);

return SetVariablesResponse.newBuilder().setKey(key).build();
}

private GeneratedMessageV3 createMessageResponse() {
static GeneratedMessageV3 createMessageResponse() {
return PublishMessageResponse.newBuilder().setKey(key).build();
}

private GeneratedMessageV3 createJobBatchResponse() {
static GeneratedMessageV3 createJobBatchResponse() {
final JobBatchRecord jobBatch = new JobBatchRecord();
jobBatch.wrap(valueBufferView);

Expand Down Expand Up @@ -249,60 +284,43 @@ private GeneratedMessageV3 createJobBatchResponse() {
.build();
}

private GeneratedMessageV3 createCompleteJobResponse() {
static GeneratedMessageV3 createCompleteJobResponse() {
return CompleteJobResponse.newBuilder().build();
}

private GeneratedMessageV3 createFailJobResponse() {
static GeneratedMessageV3 createFailJobResponse() {
return FailJobResponse.newBuilder().build();
}

private GeneratedMessageV3 createJobThrowErrorResponse() {
static GeneratedMessageV3 createJobThrowErrorResponse() {
return ThrowErrorResponse.newBuilder().build();
}

private GeneratedMessageV3 createJobUpdateRetriesResponse() {
static GeneratedMessageV3 createJobUpdateRetriesResponse() {
return UpdateJobRetriesResponse.newBuilder().build();
}

private GeneratedMessageV3 createJobResponse() {
switch ((JobIntent) intent) {
case COMPLETED:
return createCompleteJobResponse();
case FAILED:
return createFailJobResponse();
case ERROR_THROWN:
return createJobThrowErrorResponse();
case RETRIES_UPDATED:
return createJobUpdateRetriesResponse();
default:
throw new UnsupportedOperationException(
String.format("Job command '%s' is not supported", intent));
}
static GeneratedMessageV3 createJobResponse() {
return switch ((JobIntent) intent) {
case COMPLETED -> createCompleteJobResponse();
case FAILED -> createFailJobResponse();
case ERROR_THROWN -> createJobThrowErrorResponse();
case RETRIES_UPDATED -> createJobUpdateRetriesResponse();
default -> throw new UnsupportedOperationException(
String.format("Job command '%s' is not supported", intent));
};
}

private Status createRejectionResponse() {
final int statusCode;
switch (rejectionType) {
case INVALID_ARGUMENT:
statusCode = Code.INVALID_ARGUMENT_VALUE;
break;
case NOT_FOUND:
statusCode = Code.NOT_FOUND_VALUE;
break;
case ALREADY_EXISTS:
statusCode = Code.ALREADY_EXISTS_VALUE;
break;
case INVALID_STATE:
statusCode = Code.FAILED_PRECONDITION_VALUE;
break;
case PROCESSING_ERROR:
statusCode = Code.INTERNAL_VALUE;
break;
default:
statusCode = Code.UNKNOWN_VALUE;
break;
}
final int statusCode =
switch (rejectionType) {
case INVALID_ARGUMENT -> Code.INVALID_ARGUMENT_VALUE;
case NOT_FOUND -> Code.NOT_FOUND_VALUE;
case ALREADY_EXISTS -> Code.ALREADY_EXISTS_VALUE;
case INVALID_STATE -> Code.FAILED_PRECONDITION_VALUE;
case PROCESSING_ERROR -> Code.INTERNAL_VALUE;
default -> Code.UNKNOWN_VALUE;
};

return Status.newBuilder()
.setMessage(
Expand All @@ -311,4 +329,9 @@ private Status createRejectionResponse() {
.setCode(statusCode)
.build();
}

@FunctionalInterface
public interface GrpcResponseMapper<GrpcResponseType extends GeneratedMessageV3> {
GrpcResponseType apply();
}
}
Loading