Skip to content

Commit

Permalink
PUB: Add four publishers to publish TP data
Browse files Browse the repository at this point in the history
- The four publishers publish the same json, just with a different period.
  There is a filter for live, 1s, 5s and 10s publishing interval.

- See PUB_TPResult for JSON description

- publisher is called from TP_TSAnalysis thread
  • Loading branch information
MichaelHuth committed Sep 3, 2024
1 parent 2802321 commit 00941de
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 5 deletions.
4 changes: 4 additions & 0 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,10 @@ StrConstant PRESSURE_STATE_FILTER = "pressure:state"
StrConstant PRESSURE_SEALED_FILTER = "pressure:sealed"
StrConstant PRESSURE_BREAKIN_FILTER = "pressure:break in"
StrConstant AUTO_TP_FILTER = "testpulse:autotune result"
StrConstant ZMQ_FILTER_TPRESULT_NOW = "testpulse:results live"
StrConstant ZMQ_FILTER_TPRESULT_1S = "testpulse:results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S = "testpulse:results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S = "testpulse:results 10s update"
StrConstant AMPLIFIER_CLAMP_MODE_FILTER = "amplifier:clamp mode"
StrConstant AMPLIFIER_AUTO_BRIDGE_BALANCE = "amplifier:auto bridge balance"
StrConstant ANALYSIS_FUNCTION_PB = "analysis function:pipette in bath"
Expand Down
3 changes: 2 additions & 1 deletion Packages/MIES/MIES_ForeignFunctionInterface.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Function/WAVE FFI_GetAvailableMessageFilters()
PRESSURE_BREAKIN_FILTER, AUTO_TP_FILTER, AMPLIFIER_CLAMP_MODE_FILTER, \
AMPLIFIER_AUTO_BRIDGE_BALANCE, ANALYSIS_FUNCTION_PB, ANALYSIS_FUNCTION_SE, \
ANALYSIS_FUNCTION_VM, DAQ_TP_STATE_CHANGE_FILTER, \
ANALYSIS_FUNCTION_AR}
ANALYSIS_FUNCTION_AR, ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, \
ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}

Note/K wv, "Heartbeat is sent every 5 seconds."

Expand Down
185 changes: 181 additions & 4 deletions Packages/MIES/MIES_Publish.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ static Function PUB_GetJSONTemplate(string device, variable headstage)
End

/// @brief Publish the given message as given by the JSON and the filter
static Function PUB_Publish(variable jsonID, string messageFilter)
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON])
variable err
string payload

payload = JSON_Dump(jsonID)
JSON_Release(jsonID)
releaseJSON = ParamIsDefault(releaseJSON) ? 1 : !!releaseJSON
payload = JSON_Dump(jsonID)
if(releaseJSON)
JSON_Release(jsonID)
endif

AssertOnAndClearRTError()
try
zeromq_pub_send(messageFilter, payload); AbortOnRTE
catch
err = ClearRTError()
BUG("Could not publish " + messageFilter + " due to: " + num2str(err))
BUG_TS("Could not publish " + messageFilter + " due to: " + num2str(err))
endtry
End

Expand Down Expand Up @@ -642,3 +645,177 @@ Function PUB_AccessResistanceSmoke(string device, variable sweepNo, variable hea

PUB_Publish(jsonID, ANALYSIS_FUNCTION_AR)
End

threadsafe static Function PUB_AddTPResultEntry(variable jsonId, string path, variable value, string unit)

if(IsEmpty(unit))
JSON_AddVariable(jsonID, path, value)
else
JSON_AddTreeObject(jsonID, path)
JSON_AddVariable(jsonID, path + "/value", value)
JSON_AddString(jsonID, path + "/unit", unit)
endif
End

/// Filter: #ZMQ_FILTER_TPRESULT_NOW
/// Filter: #ZMQ_FILTER_TPRESULT_1S
/// Filter: #ZMQ_FILTER_TPRESULT_5S
/// Filter: #ZMQ_FILTER_TPRESULT_10S
///
/// Example:
///
/// \rst
/// .. code-block:: json
///
/// {
/// "properties": {
/// "baseline fraction": {
/// "unit": "%",
/// "value": 35
/// },
/// "clamp amplitude": {
/// "unit": "mV",
/// "value": 10
/// },
/// "clamp mode": 0,
/// "device": "TestDevice",
/// "headstage": 1,
/// "pulse duration ADC": {
/// "unit": "points",
/// "value": 500
/// },
/// "pulse duration DAC": {
/// "unit": "points",
/// "value": 600
/// },
/// "pulse start point ADC": {
/// "unit": "point",
/// "value": 500
/// },
/// "pulse start point DAC": {
/// "unit": "point",
/// "value": 600
/// },
/// "sample interval ADC": {
/// "unit": "ms",
/// "value": 0.002
/// },
/// "sample interval DAC": {
/// "unit": "ms",
/// "value": 0.002
/// },
/// "time of tp acquisition": {
/// "unit": "s",
/// "value": 1000000
/// },
/// "timestamp": {
/// "unit": "s",
/// "value": 2000000
/// },
/// "timestampUTC": {
/// "unit": "s",
/// "value": 3000000
/// },
/// "tp cycle id": 456,
/// "tp length ADC": {
/// "unit": "points",
/// "value": 1500
/// },
/// "tp length DAC": {
/// "unit": "points",
/// "value": 1800
/// },
/// "tp marker": 1234
/// },
/// "results": {
/// "average baseline steady state": {
/// "unit": "pA",
/// "value": 2
/// },
/// "average tp steady state": {
/// "unit": "pA",
/// "value": 10
/// },
/// "instantaneous": {
/// "unit": "pA",
/// "value": 11
/// },
/// "instantaneous resistance": {
/// "unit": "MΩ",
/// "value": 2345
/// },
/// "steady state resistance": {
/// "unit": "MΩ",
/// "value": 1234
/// }
/// }
/// }
///
/// \endrst
threadsafe Function PUB_TPResult(string device, WAVE tpData)

string path
variable jsonId = JSON_New()
string adUnit = GetADChannelUnit(tpData[%CLAMPMODE])
string daUnit = GetDAChannelUnit(tpData[%CLAMPMODE])

path = "properties"
JSON_AddTreeObject(jsonID, path)
JSON_AddString(jsonID, path + "/device", device)
JSON_AddVariable(jsonID, path + "/tp marker", tpData[%MARKER])
JSON_AddVariable(jsonID, path + "/headstage", tpData[%HEADSTAGE])
JSON_AddVariable(jsonID, path + "/clamp mode", tpData[%CLAMPMODE])

PUB_AddTPResultEntry(jsonId, path + "/time of tp acquisition", tpData[%NOW], "s")
PUB_AddTPResultEntry(jsonId, path + "/clamp amplitude", tpData[%CLAMPAMP], daUnit)
PUB_AddTPResultEntry(jsonId, path + "/tp length ADC", tpData[%TPLENGTHPOINTSADC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration ADC", tpData[%PULSELENGTHPOINTSADC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point ADC", tpData[%PULSESTARTPOINTSADC], "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval ADC", tpData[%SAMPLINGINTERVALADC], "ms")
PUB_AddTPResultEntry(jsonId, path + "/tp length DAC", tpData[%TPLENGTHPOINTSDAC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration DAC", tpData[%PULSELENGTHPOINTSDAC], "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point DAC", tpData[%PULSESTARTPOINTSDAC], "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval DAC", tpData[%SAMPLINGINTERVALDAC], "ms")
PUB_AddTPResultEntry(jsonId, path + "/baseline fraction", tpData[%BASELINEFRAC] * ONE_TO_PERCENT, "%")
PUB_AddTPResultEntry(jsonId, path + "/timestamp", tpData[%TIMESTAMP], "s")
PUB_AddTPResultEntry(jsonId, path + "/timestampUTC", tpData[%TIMESTAMPUTC], "s")
PUB_AddTPResultEntry(jsonId, path + "/tp cycle id", tpData[%CYCLEID], "")

path = "results"
JSON_AddTreeObject(jsonID, path)
PUB_AddTPResultEntry(jsonId, path + "/average baseline steady state", tpData[%BASELINE], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/average tp steady state", tpData[%ELEVATED_SS], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/instantaneous", tpData[%ELEVATED_INST], adUnit)
PUB_AddTPResultEntry(jsonId, path + "/steady state resistance", tpData[%STEADYSTATERES], "MΩ")
PUB_AddTPResultEntry(jsonId, path + "/instantaneous resistance", tpData[%INSTANTRES], "MΩ")

PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_NOW, releaseJSON = 0)
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_1S, 1))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_1S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_5S, 5))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_5S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_10S, 10))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_10S, releaseJSON = 0)
endif
JSON_Release(jsonID)
End

/// @brief Updates the publishing timestamp in the TUFXOP storage and returns 1 if an update is due (0 otherwise)
threadsafe static Function PUB_CheckPublishingTime(string pubFilter, variable period)

variable lastTime
variable curTime = DateTime

TUFXOP_AcquireLock/N=(pubFilter)
lastTime = TSDS_ReadVar(pubFilter, defValue = 0, create = 1)
if(lastTime + period < curTime)
TSDS_Write(pubFilter, var = curTime + period)
TUFXOP_ReleaseLock/N=(pubFilter)
return 1
endif
TUFXOP_ReleaseLock/N=(pubFilter)

return 0
End
2 changes: 2 additions & 0 deletions Packages/MIES/MIES_TestPulse.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,8 @@ threadsafe Function/DF TP_TSAnalysis(dfrInp)
tpData[%PULSESTARTPOINTSDAC] = pulseStartPointsDAC
tpData[%SAMPLINGINTERVALDAC] = samplingIntervalDAC

PUB_TPResult(device, tpData)

return dfrOut
End

Expand Down
141 changes: 141 additions & 0 deletions Packages/tests/Basic/UTF_ZeroMQPublishing.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,144 @@ static Function CheckAccessResSmoke()

JSON_Release(jsonID)
End

static Function/WAVE PrepareTPData()

WAVE tpData = GetTPAnalysisDataWave()
tpData[%NOW] = 1E6
tpData[%HEADSTAGE] = 1
tpData[%MARKER] = 1234
tpData[%NUMBER_OF_TP_CHANNELS] = 2
tpData[%TIMESTAMP] = 2E6
tpData[%TIMESTAMPUTC] = 3E6
tpData[%CLAMPMODE] = V_CLAMP_MODE
tpData[%CLAMPAMP] = 10
tpData[%BASELINEFRAC] = 0.35
tpData[%CYCLEID] = 456
tpData[%TPLENGTHPOINTSADC] = 1500
tpData[%PULSELENGTHPOINTSADC] = 500
tpData[%PULSESTARTPOINTSADC] = 500
tpData[%SAMPLINGINTERVALADC] = 0.002
tpData[%TPLENGTHPOINTSDAC] = 1800
tpData[%PULSELENGTHPOINTSDAC] = 600
tpData[%PULSESTARTPOINTSDAC] = 600
tpData[%SAMPLINGINTERVALDAC] = 0.002

tpData[%BASELINE] = 2
tpData[%ELEVATED_SS] = 10
tpData[%ELEVATED_INST] = 11
tpData[%STEADYSTATERES] = 1234
tpData[%INSTANTRES] = 2345

return tpData
End

static Function CheckTPData(variable jsonId)

variable var
string stv, daUnit, adUnit
variable clampMode = V_CLAMP_MODE

daUnit = GetDAChannelUnit(clampMode)
adUnit = GetADChannelUnit(clampMode)

var = JSON_GetVariable(jsonID, "/properties/tp marker")
CHECK_EQUAL_VAR(var, 1234)
var = JSON_GetVariable(jsonID, "/properties/headstage")
CHECK_EQUAL_VAR(var, 1)
stv = JSON_GetString(jsonID, "/properties/device")
CHECK_EQUAL_STR(stv, "TestDevice")
var = JSON_GetVariable(jsonID, "/properties/clamp mode")
CHECK_EQUAL_VAR(var, clampMode)
var = JSON_GetVariable(jsonID, "/properties/time of tp acquisition/value")
CHECK_EQUAL_VAR(var, 1E6)
stv = JSON_GetString(jsonID, "/properties/time of tp acquisition/unit")
CHECK_EQUAL_STR(stv, "s")
var = JSON_GetVariable(jsonID, "/properties/clamp amplitude/value")
CHECK_EQUAL_VAR(var, 10)
stv = JSON_GetString(jsonID, "/properties/clamp amplitude/unit")
CHECK_EQUAL_STR(stv, daUnit)
var = JSON_GetVariable(jsonID, "/properties/tp length ADC/value")
CHECK_EQUAL_VAR(var, 1500)
stv = JSON_GetString(jsonID, "/properties/tp length ADC/unit")
CHECK_EQUAL_STR(stv, "points")
var = JSON_GetVariable(jsonID, "/properties/pulse duration ADC/value")
CHECK_EQUAL_VAR(var, 500)
stv = JSON_GetString(jsonID, "/properties/pulse duration ADC/unit")
CHECK_EQUAL_STR(stv, "points")
var = JSON_GetVariable(jsonID, "/properties/pulse start point ADC/value")
CHECK_EQUAL_VAR(var, 500)
stv = JSON_GetString(jsonID, "/properties/pulse start point ADC/unit")
CHECK_EQUAL_STR(stv, "point")
var = JSON_GetVariable(jsonID, "/properties/sample interval ADC/value")
CHECK_EQUAL_VAR(var, 0.002)
stv = JSON_GetString(jsonID, "/properties/sample interval ADC/unit")
CHECK_EQUAL_STR(stv, "ms")
var = JSON_GetVariable(jsonID, "/properties/tp length DAC/value")
CHECK_EQUAL_VAR(var, 1800)
stv = JSON_GetString(jsonID, "/properties/tp length DAC/unit")
CHECK_EQUAL_STR(stv, "points")
var = JSON_GetVariable(jsonID, "/properties/pulse duration DAC/value")
CHECK_EQUAL_VAR(var, 600)
stv = JSON_GetString(jsonID, "/properties/pulse duration DAC/unit")
CHECK_EQUAL_STR(stv, "points")
var = JSON_GetVariable(jsonID, "/properties/pulse start point DAC/value")
CHECK_EQUAL_VAR(var, 600)
stv = JSON_GetString(jsonID, "/properties/pulse start point DAC/unit")
CHECK_EQUAL_STR(stv, "point")
var = JSON_GetVariable(jsonID, "/properties/sample interval DAC/value")
CHECK_EQUAL_VAR(var, 0.002)
stv = JSON_GetString(jsonID, "/properties/sample interval DAC/unit")
CHECK_EQUAL_STR(stv, "ms")
var = JSON_GetVariable(jsonID, "/properties/baseline fraction/value")
CHECK_EQUAL_VAR(var, 35)
stv = JSON_GetString(jsonID, "/properties/baseline fraction/unit")
CHECK_EQUAL_STR(stv, "%")
var = JSON_GetVariable(jsonID, "/properties/timestamp/value")
CHECK_EQUAL_VAR(var, 2E6)
stv = JSON_GetString(jsonID, "/properties/timestamp/unit")
CHECK_EQUAL_STR(stv, "s")
var = JSON_GetVariable(jsonID, "/properties/timestampUTC/value")
CHECK_EQUAL_VAR(var, 3E6)
stv = JSON_GetString(jsonID, "/properties/timestampUTC/unit")
CHECK_EQUAL_STR(stv, "s")

var = JSON_GetVariable(jsonID, "/properties/tp cycle id")
CHECK_EQUAL_VAR(var, 456)

var = JSON_GetVariable(jsonID, "/results/average baseline steady state/value")
CHECK_EQUAL_VAR(var, 2)
stv = JSON_GetString(jsonID, "/results/average baseline steady state/unit")
CHECK_EQUAL_STR(stv, adUnit)
var = JSON_GetVariable(jsonID, "/results/average tp steady state/value")
CHECK_EQUAL_VAR(var, 10)
stv = JSON_GetString(jsonID, "/results/average tp steady state/unit")
CHECK_EQUAL_STR(stv, adUnit)
var = JSON_GetVariable(jsonID, "/results/instantaneous/value")
CHECK_EQUAL_VAR(var, 11)
stv = JSON_GetString(jsonID, "/results/instantaneous/unit")
CHECK_EQUAL_STR(stv, adUnit)
var = JSON_GetVariable(jsonID, "/results/steady state resistance/value")
CHECK_EQUAL_VAR(var, 1234)
stv = JSON_GetString(jsonID, "/results/steady state resistance/unit")
CHECK_EQUAL_STR(stv, "MΩ")
var = JSON_GetVariable(jsonID, "/results/instantaneous resistance/value")
CHECK_EQUAL_VAR(var, 2345)
stv = JSON_GetString(jsonID, "/results/instantaneous resistance/unit")
CHECK_EQUAL_STR(stv, "MΩ")
End

// IUTF_TD_GENERATOR DataGenerators#PUB_TPFilters
static Function CheckTPPublishing([string str])

variable jsonId

WAVE tpData = PrepareTPData()

TUFXOP_Clear/Z/N=(str)
PUB_TPResult("TestDevice", tpData)

jsonId = FetchAndParseMessage(str)
CheckTPData(jsonId)
JSON_Release(jsonID)
End
Loading

0 comments on commit 00941de

Please sign in to comment.