diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java index 1dfc2233..2ebfc866 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java @@ -159,7 +159,6 @@ private static StreamProcessor createStreamProcessor( new SubscriptionCommandSender(context.getPartitionId(), commandSender), new DeploymentDistributionCommandSender( context.getPartitionId(), commandSender), - jobType -> {}, FeatureFlags.createDefault())))) .actorSchedulingService(scheduler) .build(); diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index e1216a90..a4aa987a 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -11,6 +11,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BrokerInfo; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse; @@ -63,6 +65,7 @@ import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationVariableInstruction; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord; +import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord; import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.ValueType; @@ -76,6 +79,7 @@ import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent; import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent; +import io.camunda.zeebe.protocol.record.intent.SignalIntent; import io.camunda.zeebe.protocol.record.intent.VariableDocumentIntent; import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic; import io.camunda.zeebe.util.VersionUtil; @@ -481,6 +485,24 @@ public void deleteResource( .intent(ResourceDeletionIntent.DELETE)); } + @Override + public void broadcastSignal( + final BroadcastSignalRequest request, + final StreamObserver responseObserver) { + final var requestId = + gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); + + writer.writeCommandWithoutKey( + new SignalRecord() + .setSignalName(request.getSignalName()) + .setVariables( + BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))), + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.SIGNAL) + .intent(SignalIntent.BROADCAST)); + } + private ProcessInstanceModificationRecord createProcessInstanceModificationRecord( final ModifyProcessInstanceRequest request) { final var record = new ProcessInstanceModificationRecord();