Skip to content

Commit

Permalink
PUB: Add four zeromq publishers for TP data
Browse files Browse the repository at this point in the history
- The four publishers publish the same json, jst with a different period.
  There is a filter for live, 1s, 5s and 10s publishing interval.

- the data is published from the TP analysis thread including additional
  information available in the thread through the previous commit.
- The additional values are also returned by the thread and collected in
  the async buffer as well then in TPResult and in TPStorage.
- The involved waves and their respective getters were adapted with new
  elements that the additional data can be stored.
- As most of the elements store the same information, thus a constant
  was introduced with a dimension label list that is used as helper for
  the wave creation in the getter functions.
  • Loading branch information
MichaelHuth committed Aug 22, 2024
1 parent 27dac75 commit 6368bbb
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 155 deletions.
11 changes: 11 additions & 0 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,10 @@ StrConstant ANALYSIS_FUNCTION_SE = "analysis function:seal evaluation"
StrConstant ANALYSIS_FUNCTION_VM = "analysis function:true resting membrane potential"
StrConstant DAQ_TP_STATE_CHANGE_FILTER = "data acquisition:state change"
StrConstant ANALYSIS_FUNCTION_AR = "analysis function:access resistance smoke"
StrConstant ZMQ_FILTER_TPRESULT_NOW = "testpulse results live"
StrConstant ZMQ_FILTER_TPRESULT_1S = "testpulse results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S = "testpulse results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S = "testpulse results 10s update"
///@}

/// which is sufficient to represent each sample point time with a distinctive number up to rates of 10 MHz.
Expand Down Expand Up @@ -2307,3 +2311,10 @@ Constant SUTTER_MAX_MAX_TP_PULSES = 10000
Constant INVALID_SWEEP_NUMBER = -1

StrConstant PERCENT_F_MAX_PREC = "%.15f"

// If this constant with dimLabels is changed the following functions should be verified:
//
// TP_TSAnalysis
// GetTPResultAsyncBuffer
// GetTPResults (reuses same dimlabels partially)
StrConstant TP_ANALYSIS_DATA_LABELS = "BASELINE;STEADYSTATERES;INSTANTRES;ELEVATED_SS;ELEVATED_INST;NOW;HEADSTAGE;MARKER;NUMBER_OF_TP_CHANNELS;TIMESTAMP;TIMESTAMPUTC;CLAMPMODE;CLAMPAMP;BASELINEFRAC;CYCLEID;TPLENGTHPOINTSADC;PULSELENGTHPOINTSADC;PULSESTARTPOINTSADC;SAMPLINGINTERVALADC;TPLENGTHPOINTSDAC;PULSELENGTHPOINTSDAC;PULSESTARTPOINTSDAC;SAMPLINGINTERVALDAC;"
3 changes: 2 additions & 1 deletion Packages/MIES/MIES_ForeignFunctionInterface.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Function/WAVE FFI_GetAvailableMessageFilters()
PRESSURE_BREAKIN_FILTER, AUTO_TP_FILTER, AMPLIFIER_CLAMP_MODE_FILTER, \
AMPLIFIER_AUTO_BRIDGE_BALANCE, ANALYSIS_FUNCTION_PB, ANALYSIS_FUNCTION_SE, \
ANALYSIS_FUNCTION_VM, DAQ_TP_STATE_CHANGE_FILTER, \
ANALYSIS_FUNCTION_AR}
ANALYSIS_FUNCTION_AR, ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, \
ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}

Note/K wv, "Heartbeat is sent every 5 seconds."

Expand Down
90 changes: 86 additions & 4 deletions Packages/MIES/MIES_Publish.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ static Function PUB_GetJSONTemplate(string device, variable headstage)
End

/// @brief Publish the given message as given by the JSON and the filter
static Function PUB_Publish(variable jsonID, string messageFilter)
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON])
variable err
string payload

payload = JSON_Dump(jsonID)
JSON_Release(jsonID)
releaseJSON = ParamIsDefault(releaseJSON) ? 1 : !!releaseJSON
payload = JSON_Dump(jsonID)
if(releaseJSON)
JSON_Release(jsonID)
endif

AssertOnAndClearRTError()
try
zeromq_pub_send(messageFilter, payload); AbortOnRTE
catch
err = ClearRTError()
BUG("Could not publish " + messageFilter + " due to: " + num2str(err))
BUG_TS("Could not publish " + messageFilter + " due to: " + num2str(err))
endtry
End

Expand Down Expand Up @@ -642,3 +645,82 @@ Function PUB_AccessResistanceSmoke(string device, variable sweepNo, variable hea

PUB_Publish(jsonID, ANALYSIS_FUNCTION_AR)
End

threadsafe static Function PUB_AddTPResultEntry(variable jsonId, string path, variable value, string unit)

if(IsEmpty(unit))
JSON_AddVariable(jsonID, path, value)
else
JSON_AddTreeObject(jsonID, path)
JSON_AddVariable(jsonID, path + "/value", value)
JSON_AddString(jsonID, path + "/unit", unit)
endif
End

threadsafe Function PUB_TPResult(STRUCT TPZMQData &tpzmq)

string path
variable jsonId = JSON_New()
string adUnit = GetADChannelUnit(tpzmq.clampMode)
string daUnit = GetDAChannelUnit(tpzmq.clampMode)

path = "properties"
JSON_AddTreeObject(jsonID, path)
JSON_AddVariable(jsonID, path + "/tp marker", tpzmq.marker)
JSON_AddString(jsonID, path + "/device", tpzmq.device)
JSON_AddVariable(jsonID, path + "/headstage", tpzmq.headstage)
JSON_AddVariable(jsonID, path + "/clamp mode", tpzmq.clampMode)

PUB_AddTPResultEntry(jsonId, path + "/time of tp acquisition", tpzmq.now, "s")
PUB_AddTPResultEntry(jsonId, path + "/clamp amplitude", tpzmq.clampAmp, daUnit)
PUB_AddTPResultEntry(jsonId, path + "/tp length ADC", tpzmq.tpLengthPointsADC, "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration ADC", tpzmq.pulseLengthPointsADC, "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point ADC", tpzmq.pulseStartPointsADC, "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval ADC", tpzmq.samplingIntervalADC, "ms")
PUB_AddTPResultEntry(jsonId, path + "/tp length DAC", tpzmq.tpLengthPointsDAC, "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse duration DAC", tpzmq.pulseLengthPointsDAC, "points")
PUB_AddTPResultEntry(jsonId, path + "/pulse start point DAC", tpzmq.pulseStartPointsDAC, "point")
PUB_AddTPResultEntry(jsonId, path + "/sample interval DAC", tpzmq.samplingIntervalDAC, "ms")
PUB_AddTPResultEntry(jsonId, path + "/baseline fraction", tpzmq.baselineFrac * ONE_TO_PERCENT, "%")
PUB_AddTPResultEntry(jsonId, path + "/timestamp", tpzmq.timeStamp, "s")
PUB_AddTPResultEntry(jsonId, path + "/timestampUTC", tpzmq.timeStampUTC, "s")
PUB_AddTPResultEntry(jsonId, path + "/tp cycle id", tpzmq.cycleId, "")

path = "results"
JSON_AddTreeObject(jsonID, path)
PUB_AddTPResultEntry(jsonId, path + "/average baseline steady state", tpzmq.avgBaselineSS, adUnit)
PUB_AddTPResultEntry(jsonId, path + "/average tp steady state", tpzmq.avgTPSS, adUnit)
PUB_AddTPResultEntry(jsonId, path + "/instantaneous", tpzmq.instVal, adUnit)
PUB_AddTPResultEntry(jsonId, path + "/steady state resistance", tpzmq.resistanceSS, "MΩ")
PUB_AddTPResultEntry(jsonId, path + "/instantaneous resistance", tpzmq.resistanceInst, "MΩ")

PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_NOW, releaseJSON = 0)
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_1S, 1))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_1S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_5S, 5))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_5S, releaseJSON = 0)
endif
if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_10S, 10))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_10S, releaseJSON = 0)
endif
JSON_Release(jsonID)
End

/// @brief Updates the publishing timestamp in the TUFXOP storage and returns 1 if an update is due (0 otherwise)
threadsafe static Function PUB_CheckPublishingTime(string pubFilter, variable period)

variable lastTime
variable curTime = DateTime

TUFXOP_AcquireLock/N=(pubFilter)
lastTime = TSDS_ReadVar(pubFilter, defValue = curTime, create = 1)
if(lastTime + period < curTime)
TSDS_Write(pubFilter, var = curTime + period)
TUFXOP_ReleaseLock/N=(pubFilter)
return 1
endif
TUFXOP_ReleaseLock/N=(pubFilter)

return 0
End
27 changes: 27 additions & 0 deletions Packages/MIES/MIES_Structures.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,30 @@ Structure SF_PlotMetaData
string xAxisLabel // from SF_META_XAXISLABEL constant
string yAxisLabel // from SF_META_YAXISLABEL constant
EndStructure

/// @brief Helper structure for TP data transfer to zeromq publisher
Structure TPZMQData
variable marker
string device
variable headstage
variable now
variable clampMode
variable clampAmp
variable tpLengthPointsADC
variable pulseLengthPointsADC
variable pulseStartPointsADC
variable samplingIntervalADC
variable tpLengthPointsDAC
variable pulseLengthPointsDAC
variable pulseStartPointsDAC
variable samplingIntervalDAC
variable baselineFrac
variable timeStamp
variable timeStampUTC
variable cycleId
variable avgBaselineSS
variable avgTPSS
variable instVal
variable resistanceSS
variable resistanceInst
EndStructure
6 changes: 3 additions & 3 deletions Packages/MIES/MIES_SweepFormula.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -3027,9 +3027,9 @@ static Function/WAVE SF_OperationTPImpl(string graph, WAVE/WAVE mode, WAVE/Z sel

// Assemble TP data
WAVE tpInput.data = SF_AverageTPFromSweep(epochMatches, sweepData)
tpInput.tpLengthPoints = DimSize(tpInput.data, ROWS)
tpInput.duration = (str2num(epochTPPulse[0][EPOCH_COL_ENDTIME]) - str2num(epochTPPulse[0][EPOCH_COL_STARTTIME])) * ONE_TO_MILLI / DimDelta(sweepData, ROWS)
tpInput.baselineFrac = TP_CalculateBaselineFraction(tpInput.duration, tpInput.duration + 2 * tpBaseLinePoints)
tpInput.tpLengthPointsADC = DimSize(tpInput.data, ROWS)
tpInput.pulseLengthPointsADC = (str2num(epochTPPulse[0][EPOCH_COL_ENDTIME]) - str2num(epochTPPulse[0][EPOCH_COL_STARTTIME])) * ONE_TO_MILLI / DimDelta(sweepData, ROWS)
tpInput.baselineFrac = TP_CalculateBaselineFraction(tpInput.pulseLengthPointsADC, tpInput.pulseLengthPointsADC + 2 * tpBaseLinePoints)

[WAVE settings, settingsIndex] = GetLastSettingChannel(numericalValues, textualValues, sweepNo, CLAMPMODE_ENTRY_KEY, dacChannelNr, XOP_CHANNEL_TYPE_DAC, DATA_ACQUISITION_MODE)
SFH_ASSERT(WaveExists(settings), "Failed to retrieve TP Clamp Mode from LBN")
Expand Down
Loading

0 comments on commit 6368bbb

Please sign in to comment.