diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 62b1c5410..aaa7eb0fe 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -774ca3c3bfcfadd490cf878a2b389b6fec7ff1cb +dd9669f7b3ee0d3f1ae906d14c29fe7a71bace78 diff --git a/src/core/action.ts b/src/core/action.ts index 8824a5517..7b66b44c3 100644 --- a/src/core/action.ts +++ b/src/core/action.ts @@ -78,7 +78,6 @@ export abstract class SchedulableAction implements Sched { tag = tag.getMicroStepLater(); } } - if (this.action instanceof FederatePortAction) { if (intendedTag === undefined) { throw new Error("FederatedPortAction must have an intended tag from RTI."); @@ -95,10 +94,10 @@ export abstract class SchedulableAction implements Sched { Log.debug(this, () => "Using intended tag from RTI, similar to schedule_at_tag(tag) with an intended tag: " + intendedTag); tag = intendedTag; - } else if (this.action.origin == Origin.logical && !(this.action instanceof Startup)) { + } else { tag = tag.getMicroStepLater(); - } - + } + Log.debug(this, () => "Scheduling " + this.action.origin + " action " + this.action._getFullyQualifiedName() + " with tag: " + tag); @@ -140,8 +139,8 @@ export class Shutdown extends Action { } } -export class FederatePortAction extends Action { - constructor(__parent__: Reactor) { - super(__parent__, Origin.logical) +export class FederatePortAction extends Action { + constructor(__parent__: Reactor, origin: Origin) { + super(__parent__, origin) } -} \ No newline at end of file +} diff --git a/src/core/federation.ts b/src/core/federation.ts index 18a50172a..6c18fdd80 100644 --- a/src/core/federation.ts +++ b/src/core/federation.ts @@ -3,7 +3,7 @@ import {Socket, createConnection, SocketConnectOpts} from 'net' import {EventEmitter} from 'events'; import { Log, Tag, TimeValue, Origin, getCurrentPhysicalTime, Alarm, - Present, App, Action, FederatePortAction, TaggedEvent + Present, App, Action, TaggedEvent } from './internal'; //---------------------------------------------------------------------// @@ -241,15 +241,20 @@ class RTIClient extends EventEmitter { // The mapping between a federate port ID and the federate port action // scheduled upon reception of a message designated for that federate port. - private federatePortActionByID: Map = new Map(); + + /** + * A mapping from port IDs to FederatePortAction instances. Unfortunately, the data type of the action has to be `any`, + * meaning that the type checker cannot check whether uses of the action are type safe. + * In an alternative design, type information might be preserved. TODO(marten): Look into this. + */ + private federatePortActionByID: Map> = new Map>(); /** * Establish the mapping between a federate port's action and its ID. * @param federatePortID The federate port's ID. * @param federatePort The federate port's action. */ - public registerFederatePortAction(federatePortID: number, federatePortAction: Action) { - Object.setPrototypeOf(federatePortAction, FederatePortAction.prototype); + public registerFederatePortAction(federatePortID: number, federatePortAction: Action) { this.federatePortActionByID.set(federatePortID, federatePortAction); } @@ -420,13 +425,15 @@ class RTIClient extends EventEmitter { * @param destPortID The port ID for the port on the destination * federate to which this message should be sent. */ - public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) { - let msg = Buffer.alloc(data.length + 9); + public sendRTIMessage(data: T, destFederateID: number, destPortID: number) { + const value = Buffer.from(JSON.stringify(data), "utf-8"); + + let msg = Buffer.alloc(value.length + 9); msg.writeUInt8(RTIMessageTypes.MSG_TYPE_MESSAGE, 0); msg.writeUInt16LE(destPortID, 1); msg.writeUInt16LE(destFederateID, 3); - msg.writeUInt32LE(data.length, 5); - data.copy(msg, 9); // Copy data into the message + msg.writeUInt32LE(value.length, 5); + value.copy(msg, 9); // Copy data into the message try { Log.debug(this, () => {return `Sending RTI (untimed) message to ` + `federate ID: ${destFederateID} and port ID: ${destPortID}.`}); @@ -447,15 +454,17 @@ class RTIClient extends EventEmitter { * @param time The time of the message encoded as a 64 bit little endian * unsigned integer in a Buffer. */ - public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number, time: Buffer) { - let msg = Buffer.alloc(data.length + 21); + public sendRTITimedMessage(data: T, destFederateID: number, destPortID: number, time: Buffer) { + const value = Buffer.from(JSON.stringify(data), "utf-8"); + + let msg = Buffer.alloc(value.length + 21); msg.writeUInt8(RTIMessageTypes.MSG_TYPE_TAGGED_MESSAGE, 0); msg.writeUInt16LE(destPortID, 1); msg.writeUInt16LE(destFederateID, 3); - msg.writeUInt32LE(data.length, 5); + msg.writeUInt32LE(value.length, 5); time.copy(msg, 9); // Copy the current time into the message // FIXME: Add microstep properly. - data.copy(msg, 21); // Copy data into the message + value.copy(msg, 21); // Copy data into the message try { Log.debug(this, () => {return `Sending RTI (timed) message to ` + `federate ID: ${destFederateID}, port ID: ${destPortID} ` @@ -897,7 +906,7 @@ export class FederatedApp extends App { * unique among all port IDs on this federate and be a number between 0 and NUMBER_OF_PORTS - 1 * @param federatePort The federate port's action for registration. */ - public registerFederatePortAction(federatePortID: number, federatePortAction: Action) { + public registerFederatePortAction(federatePortID: number, federatePortAction: Action) { if (federatePortAction.origin === Origin.logical) { this.rtiSynchronized = true; } @@ -911,7 +920,7 @@ export class FederatedApp extends App { * @param destFederateID The ID of the federate intended to receive the message. * @param destPortID The ID of the federate's port intended to receive the message. */ - public sendRTIMessage(msg: Buffer, destFederateID: number, destPortID: number ) { + public sendRTIMessage(msg: T, destFederateID: number, destPortID: number ) { Log.debug(this, () => {return `Sending RTI message to federate ID: ${destFederateID}` + ` port ID: ${destPortID}`}); this.rtiClient.sendRTIMessage(msg, destFederateID, destPortID); @@ -925,7 +934,7 @@ export class FederatedApp extends App { * @param destFederateID The ID of the Federate intended to receive the message. * @param destPortID The ID of the FederateInPort intended to receive the message. */ - public sendRTITimedMessage(msg: Buffer, destFederateID: number, destPortID: number ) { + public sendRTITimedMessage(msg: T, destFederateID: number, destPortID: number ) { let time = this.util.getCurrentTag().toBinary(); Log.debug(this, () => {return `Sending RTI timed message to federate ID: ${destFederateID}` + ` port ID: ${destPortID} and time: ${time.toString('hex')}`}); @@ -1015,14 +1024,16 @@ export class FederatedApp extends App { } }); - this.rtiClient.on('message', (destPortAction: FederatePortAction, messageBuffer: Buffer) => { + this.rtiClient.on('message', (destPortAction: Action, messageBuffer: Buffer) => { // Schedule this federate port's action. // This message is untimed, so schedule it immediately. Log.debug(this, () => {return `(Untimed) Message received from RTI.`}) - destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer); + const value: T = JSON.parse(messageBuffer.toString()); + + destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value); }); - this.rtiClient.on('timedMessage', (destPortAction: FederatePortAction, messageBuffer: Buffer, + this.rtiClient.on('timedMessage', (destPortAction: Action, messageBuffer: Buffer, tag: Tag) => { // Schedule this federate port's action. @@ -1047,13 +1058,15 @@ export class FederatedApp extends App { // FIXME: implement decentralized control. Log.debug(this, () => {return `Timed Message received from RTI with tag ${tag}.`}) + const value: T = JSON.parse(messageBuffer.toString()); + if (destPortAction.origin == Origin.logical) { - destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer, tag); + destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value, tag); } else { // The schedule function for physical actions implements // Tr = max(r, R + A) - destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer); + destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value); } }); @@ -1092,4 +1105,4 @@ export class FederatedApp extends App { */ export class RemoteFederatePort { constructor(public federateID: number, public portID: number) {} -} \ No newline at end of file +} diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 6e658275c..7b6c3b095 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -1555,8 +1555,8 @@ interface UtilityFunctions { getCurrentPhysicalTime(): TimeValue; getElapsedLogicalTime(): TimeValue; getElapsedPhysicalTime(): TimeValue; - sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number): void; - sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number): void; + sendRTIMessage(data: T, destFederateID: number, destPortID: number): void; + sendRTITimedMessage(data: T, destFederateID: number, destPortID: number): void; } export interface MutationSandbox extends ReactionSandbox { @@ -1663,11 +1663,11 @@ export class App extends Reactor { return getCurrentPhysicalTime().subtract(this.app._startOfExecution); } - public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) { + public sendRTIMessage(data: T, destFederateID: number, destPortID: number) { return this.app.sendRTIMessage(data, destFederateID, destPortID); }; - public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) { + public sendRTITimedMessage(data: T, destFederateID: number, destPortID: number) { return this.app.sendRTITimedMessage(data, destFederateID, destPortID); }; @@ -1780,22 +1780,22 @@ export class App extends Reactor { /** * Send an (untimed) message to the designated federate port through the RTI. * This function throws an error if it isn't called on a FederatedApp. - * @param data A Buffer containing the body of the message. + * @param data The data that contain the body of the message. * @param destFederateID The federate ID that is the destination of this message. * @param destPortID The port ID that is the destination of this message. */ - protected sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) { + protected sendRTIMessage(data: T, destFederateID: number, destPortID: number) { throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp"); } /** * Send a (timed) message to the designated federate port through the RTI. * This function throws an error if it isn't called on a FederatedApp. - * @param data A Buffer containing the body of the message. + * @param data The data that contain the body of the message. * @param destFederateID The federate ID that is the destination of this message. * @param destPortID The port ID that is the destination of this message. */ - protected sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) { + protected sendRTITimedMessage(data: T, destFederateID: number, destPortID: number) { throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp"); }