Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task]: Ts/1024 calibrate tool read from mq #1758

Merged
merged 22 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
96b93a3
need to remove from test operation
Tom-Szendrey Aug 10, 2023
6063d32
fix filter
Tom-Szendrey Aug 10, 2023
fb7af8b
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 10, 2023
df1e803
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 11, 2023
3a06f2d
going back to orchestration
Tom-Szendrey Aug 14, 2023
626e914
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 14, 2023
9de28b7
filtering not working
Tom-Szendrey Aug 14, 2023
b6c82f9
Need some cleanup now!
Tom-Szendrey Aug 14, 2023
a7b2468
Merge branch 'main' into TS/1024-calibrate-tool-read-from-mq
YohannParis Aug 15, 2023
383b57f
removing manual testing, will add a test file next
Tom-Szendrey Aug 15, 2023
9d80e46
Merge branch 'TS/1024-calibrate-tool-read-from-mq' of https://github.…
Tom-Szendrey Aug 15, 2023
68cabc4
leftovers
Tom-Szendrey Aug 15, 2023
9d63441
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 15, 2023
94fecc1
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 15, 2023
97818ca
Confirming lombok works and we're not going crazy
dvince2 Aug 15, 2023
e258fba
removing explicits
Tom-Szendrey Aug 15, 2023
74c14d3
Merge branch 'TS/1024-calibrate-tool-read-from-mq' of https://github.…
Tom-Szendrey Aug 15, 2023
cbbce40
Filtering out from MQ
Tom-Szendrey Aug 23, 2023
915fb00
Merge branch 'main' of https://github.com/DARPA-ASKEM/TERArium into T…
Tom-Szendrey Aug 23, 2023
8d21b3a
cleanup
Tom-Szendrey Aug 23, 2023
1874b38
cleanup
Tom-Szendrey Aug 23, 2023
15e9405
cleanup
Tom-Szendrey Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ import { ChartConfig, RunResults } from '@/types/SimulateConfig';
import { workflowEventBus } from '@/services/workflow';
import _ from 'lodash';
import { Poller, PollerState } from '@/api/api';
import useAuthStore from '@/stores/auth';
import { EventSourcePolyfill } from 'event-source-polyfill';
import {
CalibrationOperationCiemss,
CalibrationOperationStateCiemss,
Expand Down Expand Up @@ -237,20 +239,49 @@ const runCalibrate = async () => {
};
const response = await makeCalibrateJobCiemss(calibrationRequest);

// Run MQ Test
console.log(`Checking MQ for job Id ${response.simulationId}`);
const auth = useAuthStore();
const eventsource = new EventSourcePolyfill(
`/api/simulations/${response.simulationId}/partial-result`,
Tom-Szendrey marked this conversation as resolved.
Show resolved Hide resolved
{
headers: {
Authorization: `Bearer ${auth.token}`
}
}
);
eventsource.onerror = (e) => {
console.log('An error occurred while attempting to connect.');
console.log(e);
eventsource.close();
console.log('Closing eventsource');
};
eventsource.onmessage = (event) => {
console.log(event);
const json = event.data;
console.log(json);
eventsource.close();
console.log('Closing eventsource');
};
if (response?.simulationId) {
getStatus(response.simulationId);
}
};

const getStatus = async (simulationId: string) => {
console.log('Getting status');
showSpinner.value = true;
if (!simulationId) return;

if (!simulationId) {
console.log('No sim id');
return;
}
console.log(`Simulation Id:${simulationId}`);
const runIds = [simulationId];
poller
.setInterval(3000)
.setThreshold(300)
.setPollAction(async () => simulationPollAction(runIds, props.node, progress, emit));
console.log('Poller defined');
const pollerResults = await poller.start();

if (pollerResults.state !== PollerState.Done || !pollerResults.data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,10 @@
public class SimulationIntermediateResults implements Serializable {
@JsonAlias("job_id")
private String jobId;
private float progress;
private Double progress;

@Override
public String toString(){
return "{job_id: '" + this.jobId + "', progress: " + Double.toString(this.progress) + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Publisher;
import org.jboss.resteasy.annotations.SseElementType;
import software.uncharted.terarium.hmiserver.models.dataservice.Assets;
Expand All @@ -23,6 +24,7 @@
import software.uncharted.terarium.hmiserver.resources.SnakeCaseResource;
import software.uncharted.terarium.hmiserver.utils.Converter;
import software.uncharted.terarium.hmiserver.models.SimulationIntermediateResults;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import java.util.Map;
Expand Down Expand Up @@ -51,8 +53,9 @@ public class SimulationResource implements SnakeCaseResource {
@RestClient
DatasetProxy datasetProxy;

//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
@Inject
@Channel("simulationStatus") Publisher<SimulationIntermediateResults> partialSimulationResults;
@Channel("simulationStatus") Publisher<byte[]> partialSimulationStream;

@Broadcast
@Channel("simulationStatus")
Expand Down Expand Up @@ -162,25 +165,43 @@ public Response createFromSimulationResult(
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
@Tag(name = "Stream partial/intermediate simulation result associated with run ID")
public Publisher<SimulationIntermediateResults> stream(
public Publisher<byte[]> stream(
@PathParam("jobId") final String jobId
) {
return Multi.createFrom().publisher(partialSimulationResults).select().where(event -> event.getJobId().equals(jobId));
ObjectMapper mapper = new ObjectMapper();
return Multi.createFrom().publisher(partialSimulationStream).filter(event -> {
try{
//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
String jsonString = new String(event);
jsonString = jsonString.replace(" ","");

SimulationIntermediateResults interResult = mapper.readValue(jsonString, SimulationIntermediateResults.class);
Tom-Szendrey marked this conversation as resolved.
Show resolved Hide resolved

return interResult.getJobId().equals(jobId);
}
catch(Exception e){
log.error("Error occured while trying to convert simulation-status message to type: SimulationIntermediateResults");
log.error(event.toString());
log.error(e.toString());
return false;
}
});
}

// When we finalize the SimulationIntermediateResults object this end point will need to be passed more parameters
//TODO: https://github.com/DARPA-ASKEM/Terarium/issues/1757
@PUT
@Path("/{jobId}/create-partial-result")
@Produces(MediaType.APPLICATION_JSON)
@Tag(name = "Used to write to the simulation status channel providing a job ID")
public Response createPartialResult(
@PathParam("jobId") final String jobId
) {
float progress = 0.1f;
final SimulationIntermediateResults event = new SimulationIntermediateResults();
Double progress = 0.01;
SimulationIntermediateResults event = new SimulationIntermediateResults();
event.setJobId(jobId);
event.setProgress(progress);
partialSimulationEmitter.send(event);
return Response.ok(Map.of("id", jobId.toString())).build();
return Response.ok().build();
}
}