Skip to content

Commit

Permalink
task: adding ciemss name to ciemss mq related (#1847)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Szendrey authored Sep 11, 2023
1 parent 98e9565 commit 0f13f51
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ const getStatus = async (simulationId: string) => {
// open a connection for each run id and handle the messages
runIds.forEach((id) => {
eventSourceManager.openConnection(id, `/simulations/${id}/partial-result`);
eventSourceManager.openConnection(id, `/simulations/${id}/ciemss/partial-result`);
eventSourceManager.setMessageHandler(id, handlingProgress);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@Data
@Accessors(chain = true)
public class SimulationIntermediateResults implements Serializable {
public class SimulationIntermediateResultsCiemss implements Serializable {
@JsonAlias("job_id")
private String jobId;
private Double progress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import software.uncharted.terarium.hmiserver.proxies.dataservice.SimulationProxy;
import software.uncharted.terarium.hmiserver.resources.SnakeCaseResource;
import software.uncharted.terarium.hmiserver.utils.Converter;
import software.uncharted.terarium.hmiserver.models.SimulationIntermediateResults;
import software.uncharted.terarium.hmiserver.models.SimulationIntermediateResultsCiemss;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Emitter;
Expand Down Expand Up @@ -55,11 +55,11 @@ public class SimulationResource implements SnakeCaseResource {

//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
@Inject
@Channel("simulationStatus") Publisher<byte[]> partialSimulationStream;
@Channel("simulationStatus") Publisher<byte[]> simulationStatusStream;

@Broadcast
@Channel("simulationStatus")
Emitter<SimulationIntermediateResults> partialSimulationEmitter;
Emitter<SimulationIntermediateResultsCiemss> simulationStatusEmitter;

@POST
public Simulation createSimulation(final Simulation simulation){
Expand Down Expand Up @@ -160,26 +160,26 @@ public Response createFromSimulationResult(
}

@GET
@Path("/{jobId}/partial-result")
@Path("/{jobId}/ciemss/partial-result")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
@Tag(name = "Stream partial/intermediate simulation result associated with run ID")
public Publisher<byte[]> stream(
@PathParam("jobId") final String jobId
) {
ObjectMapper mapper = new ObjectMapper();
return Multi.createFrom().publisher(partialSimulationStream).filter(event -> {
return Multi.createFrom().publisher(simulationStatusStream).filter(event -> {
try{
//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
String jsonString = new String(event);
jsonString = jsonString.replace(" ","");

SimulationIntermediateResults interResult = mapper.readValue(jsonString, SimulationIntermediateResults.class);
SimulationIntermediateResultsCiemss interResult = mapper.readValue(jsonString, SimulationIntermediateResultsCiemss.class);

return interResult.getJobId().equals(jobId);
}
catch(Exception e){
log.error("Error occured while trying to convert simulation-status message to type: SimulationIntermediateResults");
log.error("Error occured while trying to convert simulation-status message to type: SimulationIntermediateResultsCiemss");
log.error(event.toString());
log.error(e.toString());
return false;
Expand All @@ -190,17 +190,17 @@ public Publisher<byte[]> stream(
// When we finalize the SimulationIntermediateResults object this end point will need to be passed more parameters
//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
@PUT
@Path("/{jobId}/create-partial-result")
@Path("/{jobId}/ciemss/create-partial-result")
@Produces(MediaType.APPLICATION_JSON)
@Tag(name = "Used to write to the simulation status channel providing a job ID")
public Response createPartialResult(
@PathParam("jobId") final String jobId
) {
Double progress = 0.01;
SimulationIntermediateResults event = new SimulationIntermediateResults();
SimulationIntermediateResultsCiemss event = new SimulationIntermediateResultsCiemss();
event.setJobId(jobId);
event.setProgress(progress);
partialSimulationEmitter.send(event);
simulationStatusEmitter.send(event);
return Response.ok().build();
}
}

0 comments on commit 0f13f51

Please sign in to comment.