diff --git a/.gitignore b/.gitignore index 293846184..3b2a40b95 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ *.orig *.log .*.swp +.vscode \ No newline at end of file diff --git a/documentation/pvalink-schema-0.json b/documentation/pvalink-schema-0.json new file mode 100644 index 000000000..00abd9823 --- /dev/null +++ b/documentation/pvalink-schema-0.json @@ -0,0 +1,35 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://mdavidsaver.github.io/pvxs/pvalink-schema-0.json", + "title": "PVA Link schema", + "type": ["string", "object"], + "properties": { + "pv": { "type": "string" }, + "field": { + "type": "string", + "default": "value" + }, + "Q": { + "type": "integer", + "default": 4 + }, + "proc": { + "type": ["boolean", "string", "null"], + "enum": [true, false, null, "", "NPP", "PP", "CP", "CPP"], + "default": null + }, + "sevr": { + "type": ["boolean", "string"], + "enum": [true, false, "NMS", "MS", "MSI", "MSS"], + "default": "NMS" + }, + "time": { "type": "boolean", "default": false }, + "monorder": { "type": "integer", "default": 0 }, + "defer": { "type": "boolean", "default": false }, + "retry": { "type": "boolean", "default": false }, + "pipeline": { "type": "boolean", "default": false }, + "always": { "type": "boolean", "default": false }, + "local": { "type": "boolean", "default": false } + }, + "additionalProperties": false +} diff --git a/ioc/Makefile b/ioc/Makefile index 4c0cea824..5f9f7c884 100644 --- a/ioc/Makefile +++ b/ioc/Makefile @@ -57,10 +57,11 @@ pvxsIoc_SRCS += groupconfigprocessor.cpp pvxsIoc_SRCS += groupprocessorcontext.cpp pvxsIoc_SRCS += groupsource.cpp pvxsIoc_SRCS += groupsourcehooks.cpp - -else # BASE_7_0 - -pvxsIoc_SRCS += dummygroup.cpp +pvxsIoc_SRCS += pvalink.cpp +pvxsIoc_SRCS += pvalink_channel.cpp +pvxsIoc_SRCS += pvalink_jlif.cpp +pvxsIoc_SRCS += pvalink_link.cpp +pvxsIoc_SRCS += pvalink_lset.cpp endif # BASE_7_0 @@ -84,3 +85,10 @@ include $(TOP)/configure/RULES_PVXS_MODULE #---------------------------------------- # ADD RULES AFTER THIS LINE +ifdef BASE_7_0 +../O.Common/pvxsIoc.dbd: ../pvxs7x.dbd + $(CP) $< $@ +else +../O.Common/pvxsIoc.dbd: ../pvxs3x.dbd + $(CP) $< $@ +endif \ No newline at end of file diff --git a/ioc/pvalink.cpp b/ioc/pvalink.cpp new file mode 100644 index 000000000..24eb59f85 --- /dev/null +++ b/ioc/pvalink.cpp @@ -0,0 +1,358 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include + +#define EPICS_DBCA_PRIVATE_API +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "pvalink.h" +#include "dblocker.h" +#include "dbentry.h" +#include "iocshcommand.h" +#include "utilpvt.h" + +#include /* redirects stdout/stderr; include after util.h from libevent */ +#include /* defines epicsExportSharedSymbols */ + +#if EPICS_VERSION_INT>=VERSION_INT(7,0,6,0) +# define HAVE_SHUTDOWN_HOOKS +#endif + +namespace pvxs { namespace ioc { + +using namespace pvxlink; + +namespace { + +// halt, and clear, scan workers before dbCloseLinks() (cf. iocShutdown()) +static void shutdownStep1() +{ + // no locking here as we assume that shutdown doesn't race startup + if(!pvaGlobal) return; + + pvaGlobal->close(); +} + +// Cleanup pvaGlobal, including PVA client and QSRV providers ahead of PDB cleanup +// specifically QSRV provider must be free'd prior to db_cleanup_events() +static void shutdownStep2() +{ + if(!pvaGlobal) return; + + { + Guard G(pvaGlobal->lock); + if(pvaGlobal->channels.size()) { + fprintf(stderr, "pvaLink leaves %zu channels open\n", + pvaGlobal->channels.size()); + } + } + + delete pvaGlobal; + pvaGlobal = NULL; +} + +#ifndef HAVE_SHUTDOWN_HOOKS +static void stopPVAPool(void*) +{ + try { + shutdownStep1(); + }catch(std::exception& e){ + fprintf(stderr, "Error while stopping PVA link pool : %s\n", e.what()); + } +} + +static void finalizePVA(void*) +{ + try { + shutdownStep2(); + }catch(std::exception& e){ + fprintf(stderr, "Error initializing pva link handling : %s\n", e.what()); + } +} +#endif + +/* The Initialization game... + * + * # Parse links during dbPutString() (calls our jlif*) + * # announce initHookAfterCaLinkInit + * # dbChannelInit() (needed for QSRV to work) + * # Re-parse links (calls to our jlif*) + * # Open links. Calls jlif::get_lset() and then lset::openLink() + * # announce initHookAfterInitDatabase + * # ... scan threads start ... + * # announce initHookAfterIocBuilt + */ +void initPVALink(initHookState state) +{ + try { + if(state==initHookAfterCaLinkInit) { + // before epicsExit(exitDatabase), + // so hook registered here will be run after iocShutdown() + // which closes links + if(pvaGlobal) { + cantProceed("# Missing call to testqsrvShutdownOk() and/or testqsrvCleanup()"); + } + pvaGlobal = new pvaGlobal_t; + +#ifndef HAVE_SHUTDOWN_HOOKS + static bool atexitInstalled; + if(!atexitInstalled) { + epicsAtExit(finalizePVA, NULL); + atexitInstalled = true; + } +#endif + + } else if(state==initHookAfterInitDatabase) { + // TODO "local" provider + if (inUnitTest()) { + pvaGlobal->provider_remote = ioc::server().clientConfig().build(); + } else { + pvaGlobal->provider_remote = client::Config().build(); + } + + } else if(state==initHookAfterIocBuilt) { + // after epicsExit(exitDatabase) + // so hook registered here will be run before iocShutdown() + +#ifndef HAVE_SHUTDOWN_HOOKS + epicsAtExit(stopPVAPool, NULL); +#endif + + Guard G(pvaGlobal->lock); + pvaGlobal->running = true; + + for(pvaGlobal_t::channels_t::iterator it(pvaGlobal->channels.begin()), end(pvaGlobal->channels.end()); + it != end; ++it) + { + std::shared_ptr chan(it->second.lock()); + if(!chan) continue; + + chan->open(); + } +#ifdef HAVE_SHUTDOWN_HOOKS + } else if(state==initHookAtShutdown) { + shutdownStep1(); + + } else if(state==initHookAfterShutdown) { + shutdownStep2(); +#endif + } + }catch(std::exception& e){ + cantProceed("Error initializing pva link handling : %s\n", e.what()); + } +} + +} // namespace + +// halt, and clear, scan workers before dbCloseLinks() (cf. iocShutdown()) +void testqsrvShutdownOk(void) +{ + try { + shutdownStep1(); + }catch(std::exception& e){ + testAbort("Error while stopping PVA link pool : %s\n", e.what()); + } +} + +void testqsrvCleanup(void) +{ + try { + shutdownStep2(); + }catch(std::exception& e){ + testAbort("Error initializing pva link handling : %s\n", e.what()); + } +} + +void testqsrvWaitForLinkEvent(struct link *plink) +{ + std::shared_ptr lchan; + { + DBLocker lock(plink->precord); + + if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) { + testAbort("Not a PVA link"); + } + pvaLink *pval = static_cast(plink->value.json.jlink); + lchan = pval->lchan; + } + if(lchan) { + lchan->run_done.wait(); + } +} + +extern "C" +void dbpvar(const char *precordname, int level) +{ + try { + if(!pvaGlobal) { + printf("PVA links not initialized\n"); + return; + } + + if (!precordname || precordname[0] == '\0' || !strcmp(precordname, "*")) { + precordname = NULL; + printf("PVA links in all records\n\n"); + } else { + printf("PVA links in record named '%s'\n\n", precordname); + } + + size_t nchans = 0, nlinks = 0, nconn = 0; + + pvaGlobal_t::channels_t channels; + { + Guard G(pvaGlobal->lock); + channels = pvaGlobal->channels; // copy snapshot + } + + for(pvaGlobal_t::channels_t::const_iterator it(channels.begin()), end(channels.end()); + it != end; ++it) + { + std::shared_ptr chan(it->second.lock()); + if(!chan) continue; + + Guard G(chan->lock); + + if(precordname) { + // only show links fields of these records + bool match = false; + for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end()); + it2 != end2; ++it2) + { + const pvaLink *pval = *it2; + // plink==NULL shouldn't happen, but we are called for debugging, so be paranoid. + if(pval->plink && epicsStrGlobMatch(pval->plink->precord->name, precordname)) { + match = true; + nlinks++; + } + } + if(!match) + continue; + } + + nchans++; + if(chan->state == pvaLinkChannel::Connected) + nconn++; + + if(!precordname) + nlinks += chan->links.size(); + + if(level<=0) + continue; + + if(level>=2 || (chan->state != pvaLinkChannel::Connected && level==1)) { + if(chan->key.first.size()<=28) { + printf("%28s ", chan->key.first.c_str()); + } else { + printf("%s\t", chan->key.first.c_str()); + } + + printf("conn=%c %zu disconnects, %zu type changes", + chan->state == pvaLinkChannel::Connected?'T':'F', + chan->num_disconnect, + chan->num_type_change); + if(chan->op_put) { + printf(" Put"); + } + + if(level>=3) { + printf(", provider '%s'", chan->providerName.c_str()); + } + printf("\n"); + // level 4 reserved for channel/provider details + + if(level>=5) { + for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end()); + it2 != end2; ++it2) + { + const pvaLink *pval = *it2; + + if(!pval->plink) + continue; + else if(precordname && !epicsStrGlobMatch(pval->plink->precord->name, precordname)) + continue; + + const char *fldname = "???"; + ioc::DBEntry rec(pval->plink->precord); + for(bool done = !!dbFirstField(rec, 0); !done; done = !!dbNextField(rec, 0)) + { + if(rec->pfield == (void*)pval->plink) { + fldname = rec->pflddes->name; + break; + } + } + + printf("%*s%s.%s", 30, "", pval->plink ? pval->plink->precord->name : "", fldname); + + switch(pval->proc) { + case pvaLinkConfig::NPP: printf(" NPP"); break; + case pvaLinkConfig::Default: printf(" Def"); break; + case pvaLinkConfig::PP: printf(" PP"); break; + case pvaLinkConfig::CP: printf(" CP"); break; + case pvaLinkConfig::CPP: printf(" CPP"); break; + } + switch(pval->sevr) { + case pvaLinkConfig::NMS: printf(" NMS"); break; + case pvaLinkConfig::MS: printf(" MS"); break; + case pvaLinkConfig::MSI: printf(" MSI"); break; + } + + printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d\n", + unsigned(pval->queueSize), + pval->pipeline ? 'T' : 'F', + pval->defer ? 'T' : 'F', + pval->time ? 'T' : 'F', + pval->retry ? 'T' : 'F', + pval->monorder); + } + printf("\n"); + } + } + } + + printf(" %zu/%zu channels connected used by %zu links\n", + nconn, nchans, nlinks); + + } catch(std::exception& e) { + fprintf(stderr, "Error: %s\n", e.what()); + } +} + +static +void installPVAAddLinkHook() +{ + initHookRegister(&initPVALink); + IOCShCommand("dbpvar", "dbpvar", "record name", "level") + .implementation<&dbpvar>(); +// epics::registerRefCounter("pvaLinkChannel", &pvaLinkChannel::num_instances); +// epics::registerRefCounter("pvaLink", &pvaLink::num_instances); +} + +}} // namespace pvxs::ioc + +extern "C" { + using pvxs::ioc::installPVAAddLinkHook; + epicsExportRegistrar(installPVAAddLinkHook); + epicsExportAddress(int, pvaLinkNWorkers); +} diff --git a/ioc/pvalink.h b/ioc/pvalink.h new file mode 100644 index 000000000..d79749e64 --- /dev/null +++ b/ioc/pvalink.h @@ -0,0 +1,235 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PVALINK_H +#define PVALINK_H + +#include +#include + +#define EPICS_DBCA_PRIVATE_API +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "dbmanylocker.h" + +extern "C" { + extern int pvaLinkNWorkers; +} + +namespace pvxlink { +using namespace pvxs; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +struct pvaLink; +struct pvaLinkChannel; + +extern lset pva_lset; +extern jlif lsetPVA; + +struct pvaLinkConfig : public jlink +{ + // configuration, output of jlif parsing + //! Channel (aka PV) name string + std::string channelName; + //! sub-field within addressed PVStructure + std::string fieldName; + + size_t queueSize = 4; + + enum pp_t { + NPP, + Default, // for put() only. For monitor, treated as NPP + PP, // for put() only, For monitor, treated as NPP + CP, // for monitor only, put treats as pp + CPP, // for monitor only, put treats as pp + } proc = Default; + enum ms_t { + NMS, + MS, + MSI, + } sevr = NMS; + + bool defer = false; + bool pipeline = false; + bool time = false; + bool retry = false; + bool local = false; + bool always = false; + int monorder = 0; + + // internals used by jlif parsing + std::string jkey; + + virtual ~pvaLinkConfig(); +}; + +struct pvaGlobal_t : private epicsThreadRunable { + client::Context provider_remote; + + MPMCFIFO> queue; + + epicsMutex lock; + + bool running; // set after dbEvent is initialized and safe to use + + // a tuple of channel name and printed pvRequest (or Monitor) + typedef std::pair channels_key_t; + // pvaLinkChannel dtor prunes dead entires + typedef std::map > channels_t; + // Cache of active Channels (really about caching Monitor) + channels_t channels; + +private: + epicsThread worker; + bool workerStop = false; + virtual void run() override final; +public: + + pvaGlobal_t(); + virtual ~pvaGlobal_t(); + void close(); +}; +extern pvaGlobal_t *pvaGlobal; + +struct pvaLinkChannel : public epicsThreadRunable + ,public std::enable_shared_from_this +{ + const pvaGlobal_t::channels_key_t key; // tuple of (channelName, pvRequest key) + const Value pvRequest; // used with monitor + + static size_t num_instances; + + epicsMutex lock; + epicsEvent run_done; // used by testing code + +// std::shared_ptr chan; + std::shared_ptr op_mon; + std::shared_ptr op_put; + Value root; + + std::string providerName; + size_t num_disconnect = 0u, num_type_change = 0u; + enum state_t { + Disconnected, + Connecting, + Connected, + } state = Disconnected; + + bool isatomic = false; + bool debug = false; // set if any jlink::debug is set + typedef std::set after_put_t; + after_put_t after_put; + + struct LinkSort { + bool operator()(const pvaLink *L, const pvaLink *R) const; + }; + + typedef std::set links_t; + + // list of currently attached links. maintained by pvaLink ctor/dtor + // TODO: sort by PHAS + links_t links; + + // set when 'links' is modified to trigger re-compute of record scan list + bool links_changed = false; + + pvaLinkChannel(const pvaGlobal_t::channels_key_t& key, const Value &pvRequest); + virtual ~pvaLinkChannel(); + + void open(); + void put(bool force=false); // begin Put op. + + struct AfterPut : public epicsThreadRunable { + std::weak_ptr lc; + virtual ~AfterPut() {} + virtual void run() override final; + }; + std::shared_ptr AP; +private: + virtual void run() override final; + void run_dbProcess(size_t idx); // idx is index in scan_records + + // ==== Treat remaining as local to run() + + std::vector scan_records; + std::vector scan_check_passive; + + ioc::DBManyLock atomic_lock; +}; + +struct pvaLink final : public pvaLinkConfig +{ + static size_t num_instances; + + bool alive = true; // attempt to catch some use after free + dbfType type = (dbfType)-1; + + DBLINK * plink = nullptr; + + std::shared_ptr lchan; + + bool used_scratch = false; + bool used_queue = false; + shared_array put_scratch, put_queue; + + // cached fields from channel op_mon + // updated in onTypeChange() + Value fld_value; + Value fld_severity, + fld_seconds, + fld_nanoseconds; + Value fld_display, + fld_control, + fld_valueAlarm; + + // cached snapshot of alarm and timestamp + // captured in pvaGetValue(). + // we choose not to ensure consistency with display/control meta-data + epicsTimeStamp snap_time = {}; + short snap_severity = INVALID_ALARM; + + pvaLink(); + virtual ~pvaLink(); + + // returns pvRequest to be used with monitor + Value makeRequest(); + + bool valid() const; + + // fetch a sub-sub-field of the top monitored field. + Value getSubField(const char *name); + + void onDisconnect(); + void onTypeChange(); +}; + + +} // namespace pvalink + +#endif // PVALINK_H diff --git a/ioc/pvalink_channel.cpp b/ioc/pvalink_channel.cpp new file mode 100644 index 000000000..308128fb0 --- /dev/null +++ b/ioc/pvalink_channel.cpp @@ -0,0 +1,462 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include + +#include "pvalink.h" +#include "dblocker.h" +#include "dbmanylocker.h" + +DEFINE_LOGGER(_logger, "ioc.pvalink.channel"); + +int pvaLinkNWorkers = 1; + +namespace pvxlink { +using namespace pvxs; + +pvaGlobal_t *pvaGlobal; + + +pvaGlobal_t::pvaGlobal_t() + :queue() + ,running(false) + ,worker(*this, + "pvxlink", + epicsThreadGetStackSize(epicsThreadStackBig), + // worker should be above PVA worker priority? + epicsThreadPriorityMedium) +{ + // TODO respect pvaLinkNWorkers? + worker.start(); +} + +pvaGlobal_t::~pvaGlobal_t() +{ +} + +void pvaGlobal_t::run() +{ + while(1) { + auto w = queue.pop(); + if(auto chan = w.lock()) { + chan->run(); + } + { + Guard G(lock); + if(workerStop) + break; + } + } + +} + +void pvaGlobal_t::close() +{ + { + Guard G(lock); + workerStop = true; + } + queue.push(std::weak_ptr()); + worker.exitWait(); +} + +size_t pvaLinkChannel::num_instances; +size_t pvaLink::num_instances; + + +bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const { + if(L->monorder==R->monorder) + return L < R; + return L->monorder < R->monorder; +} + +// being called with pvaGlobal::lock held +pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const Value& pvRequest) + :key(key) + ,pvRequest(pvRequest) + ,AP(new AfterPut) +{} + +pvaLinkChannel::~pvaLinkChannel() { + { + Guard G(pvaGlobal->lock); + pvaGlobal->channels.erase(key); + } + + Guard G(lock); + + assert(links.empty()); +} + +void pvaLinkChannel::open() +{ + Guard G(lock); + + op_mon = pvaGlobal->provider_remote.monitor(key.first) + .maskConnected(true) + .maskDisconnected(false) + .rawRequest(pvRequest) + .event([this](const client::Subscription&) + { + log_debug_printf(_logger, "Received message: %s %s\n", key.first.c_str(), key.second.c_str()); + pvaGlobal->queue.push(shared_from_this()); + }) + .exec(); + providerName = "remote"; +} + +static +Value linkBuildPut(pvaLinkChannel *self, Value&& prototype) +{ + Guard G(self->lock); + + auto top(std::move(prototype)); + + for(auto link : self->links) + { + if(!link->used_queue) continue; + link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop + + auto value(link->fieldName.empty() ? top : top[link->fieldName]); + if(value.type()==TypeCode::Struct) { + // maybe drill into NTScalar et al. + if(auto sub = value["value"]) + value = std::move(sub); + } + + if(!value) continue; // TODO: how to signal error? + + auto tosend(std::move(link->put_queue)); + + if(value.type().isarray()) { + value = tosend; + } else { + if (tosend.empty()) + continue; // TODO: Signal error + + if (value.type() == TypeCode::Struct && value.id() == "enum_t") { + value = value["index"]; // We want to assign to the index for enum types + } + + switch (tosend.original_type()) + { + case ArrayType::Int8: value = tosend.castTo()[0]; break; + case ArrayType::Int16: value = tosend.castTo()[0]; break; + case ArrayType::Int32: value = tosend.castTo()[0]; break; + case ArrayType::Int64: value = tosend.castTo()[0]; break; + case ArrayType::UInt8: value = tosend.castTo()[0]; break; + case ArrayType::UInt16: value = tosend.castTo()[0]; break; + case ArrayType::UInt32: value = tosend.castTo()[0]; break; + case ArrayType::UInt64: value = tosend.castTo()[0]; break; + case ArrayType::Float32: value = tosend.castTo()[0]; break; + case ArrayType::Float64: value = tosend.castTo()[0]; break; + case ArrayType::String: value = tosend.castTo()[0]; break; + case ArrayType::Bool: + case ArrayType::Null: + case ArrayType::Value: + std::ostringstream buffer; + buffer << tosend.original_type(); + log_exc_printf(_logger, "Unsupported type %s\n", buffer.str().c_str()); + } + } + } + log_debug_printf(_logger, "%s put built\n", self->key.first.c_str()); + + return top; +} + +void linkPutDone(pvaLinkChannel *self, client::Result&& result) +{ + bool ok = false; + try { + result(); + ok = true; + }catch(std::exception& e){ + errlogPrintf("%s PVA link put ERROR: %s\n", self->key.first.c_str(), e.what()); + } + + bool needscans; + { + Guard G(self->lock); + + log_debug_printf(_logger, "%s put result %s\n", self->key.first.c_str(), ok ? "OK" : "Not OK"); + + needscans = !self->after_put.empty(); + self->op_put.reset(); + + if(ok) { + // see if we need start a queue'd put + self->put(); + } + } + + if(needscans) { + pvaGlobal->queue.push(self->AP); + } +} + +// call with channel lock held +void pvaLinkChannel::put(bool force) +{ + // TODO cache TypeDef in global + using namespace pvxs::members; + auto pvReq(TypeDef(TypeCode::Struct, { + Struct("field", {}), + Struct("record", { + Struct("_options", { + Bool("block"), + String("process"), + }), + }), }).create() + .update("record._options.block", !after_put.empty())); + + unsigned reqProcess = 0; + bool doit = force; + for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) + { + pvaLink *link = *it; + + if(!link->used_scratch) continue; + + link->put_queue = std::move(link->put_scratch); + link->used_scratch = false; + link->used_queue = true; + + doit = true; + + switch(link->proc) { + case pvaLink::NPP: + reqProcess |= 1; + break; + case pvaLink::Default: + break; + case pvaLink::PP: + case pvaLink::CP: + case pvaLink::CPP: + reqProcess |= 2; + break; + } + } + + /* By default, use remote default (passive). + * Request processing, or not, if any link asks. + * Prefer PP over NPP if both are specified. + * + * TODO: per field granularity? + */ + const char *proc = "passive"; + if((reqProcess&2) || force) { + proc = "true"; + } else if(reqProcess&1) { + proc = "false"; + } + pvReq["record._options.process"] = proc; + + log_debug_printf(_logger, "%s Start put %s\n", key.first.c_str(), doit ? "true": "false"); + if(doit) { + // start net Put, cancels in-progress put + op_put = pvaGlobal->provider_remote.put(key.first) + .build([this](Value&& prototype) -> Value + { + return linkBuildPut(this, std::move(prototype)); // TODO + }) + .result([this](client::Result&& result) + { + linkPutDone(this, std::move(result)); + }) + .exec(); + } +} + +void pvaLinkChannel::AfterPut::run() +{ + std::set toscan; + std::shared_ptr link(lc.lock()); + if(!link) + return; + + { + Guard G(link->lock); + toscan.swap(link->after_put); + } + + for(after_put_t::iterator it=toscan.begin(), end=toscan.end(); + it!=end; ++it) + { + dbCommon *prec = *it; + dbScanLock(prec); + if(prec->pact) { // complete async. processing + (prec)->rset->process(prec); + + } else { + // maybe the result of "cancellation" or some record support logic error? + errlogPrintf("%s : not PACT when async PVA link completed. Logic error?\n", prec->name); + } + dbScanUnlock(prec); + } + +} + +// the work in calling dbProcess() which is common to +// both dbScanLock() and dbScanLockMany() +void pvaLinkChannel::run_dbProcess(size_t idx) +{ + dbCommon *precord = scan_records[idx]; + + if(scan_check_passive[idx] && precord->scan!=0) { + return; + + // TODO: This relates to caching of the individual links and comparing it to + // the posted monitor. This is, as I understand it, an optimisation and + // we can sort of ignore it for now. + //} else if(state_latched == Connected && !op_mon.changed.logical_and(scan_changed[idx])) { + // return; + + } else if (precord->pact) { + if (precord->tpro) + printf("%s: Active %s\n", + epicsThreadGetNameSelf(), precord->name); + precord->rpro = TRUE; + + } + dbProcess(precord); +} + +// Running from global WorkQueue thread +void pvaLinkChannel::run() +{ + bool requeue = false; + { + Guard G(lock); + + log_debug_printf(_logger,"Running task %s\n", this->key.first.c_str()); + + Value top; + try { + top = op_mon->pop(); + if(!top) { + log_debug_printf(_logger, "Queue empty %s\n", this->key.first.c_str()); + run_done.signal(); + return; + } + state = Connected; + } catch(client::Disconnect&) { + log_debug_printf(_logger, "PVA link %s received disonnection event\n", this->key.first.c_str()); + + state = Disconnected; + + num_disconnect++; + + // cancel pending put operations + op_put.reset(); + + for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) + { + pvaLink *link = *it; + link->onDisconnect(); + } + + // Don't clear previous_root on disconnect. + // We will usually re-connect with the same type, + // and may get back the same PVStructure. + + } catch(std::exception& e) { + errlogPrintf("pvalinkChannel::run: Unexpected exception while reading from monitor queue: %s\n", e.what()); + } + + if (state == Connected) { + // Fetch the data from the incoming monitor + if (root.equalType(top)) + { + log_debug_printf(_logger, "pvalinkChannel update value %s\n", this->key.first.c_str()); + + root.assign(top); + } + else + { + log_debug_printf(_logger, "pvalinkChannel %s update type\n", this->key.first.c_str()); + root = top; + num_type_change++; + + for (links_t::iterator it(links.begin()), end(links.end()); it != end; ++it) + { + pvaLink *link = *it; + link->onTypeChange(); + } + } + + requeue = true; + } + + if(links_changed) { + // a link has been added or removed since the last update. + // rebuild our cached list of records to (maybe) process. + + scan_records.clear(); + scan_check_passive.clear(); + + for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) + { + pvaLink *link = *it; + assert(link && link->alive); + + if(!link->plink) continue; + + // only scan on monitor update for input links + if(link->type!=DBF_INLINK) + continue; + + // NPP and none/Default don't scan + // PP, CP, and CPP do scan + // PP and CPP only if SCAN=Passive + if(link->proc != pvaLink::PP && link->proc != pvaLink::CPP && link->proc != pvaLink::CP) + continue; + + scan_records.push_back(link->plink->precord); + scan_check_passive.push_back(link->proc != pvaLink::CP); + } + + log_debug_printf(_logger, "Links changed, scan_records size = %lu\n", scan_records.size()); + + atomic_lock = ioc::DBManyLock(scan_records); + + links_changed = false; + } + } + + if(scan_records.empty()) { + // Nothing to do, so don't bother locking + + } else if(isatomic && scan_records.size() > 1u) { + ioc::DBManyLocker L(atomic_lock); + + for(size_t i=0, N=scan_records.size(); iname); + + ioc::DBLocker L(scan_records[i]); + run_dbProcess(i); + } + } + + if(requeue) { + log_debug_printf(_logger, "Requeueing %s\n", key.first.c_str()); + // re-queue until monitor queue is empty + pvaGlobal->queue.push(shared_from_this()); + } else { + log_debug_printf(_logger, "Run done instead of requeue %s\n", key.first.c_str()); + run_done.signal(); + } +} + +} // namespace pvalink diff --git a/ioc/pvalink_jlif.cpp b/ioc/pvalink_jlif.cpp new file mode 100644 index 000000000..909034946 --- /dev/null +++ b/ioc/pvalink_jlif.cpp @@ -0,0 +1,304 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include // redirects stdout/stderr + +#include "pvalink.h" + +#include + +namespace pvxlink { +pvaLinkConfig::~pvaLinkConfig() {} + +namespace { + +/* link options. + * + * "pvname" # short-hand, sets PV name only + * + * { + * "pv":"name", + * "field":"blah.foo", + * "Q":5, + * "pipeline":false, + * "proc":true, // false, true, none, "", "NPP", "PP", "CP", "CPP" + * "sevr":true, // false, true, "NMS", "MS", "MSI", "MSS" + * "time":true, // false, true + * "monorder":#,// order of processing during CP scan + * "defer":true,// whether to immediately start Put, or only queue value to be sent + * "retry":true,// queue Put while disconnected, and retry on connect + * "always":true,// CP/CPP updates always process a like, even if its input field hasn't changed + * "local":false,// Require local channel + * } + */ + +jlink* pva_alloc_jlink(short) +{ + try { + return new pvaLink; + + }catch(std::exception& e){ + errlogPrintf("Error allocating pva link: %s\n", e.what()); + return NULL; + } +} + +#define TRY pvaLinkConfig *pvt = static_cast(pjlink); (void)pvt; try +#define CATCH(RET) catch(std::exception& e){ \ + errlogPrintf("Error in %s link: %s\n", __FUNCTION__, e.what()); \ + return RET; } + +void pva_free_jlink(jlink *pjlink) +{ + TRY { + delete pvt; + }catch(std::exception& e){ + errlogPrintf("Error freeing pva link: %s\n", e.what()); + } +} + +jlif_result pva_parse_null(jlink *pjlink) +{ + TRY { + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "proc") { + pvt->proc = pvaLinkConfig::Default; + } else if(pvt->jkey == "sevr") { + pvt->sevr = pvaLinkConfig::NMS; + } else if(pvt->jkey == "local") { + pvt->local = false; // alias for local:false + } else if(pvt->debug) { + printf("pva link parsing unknown none depth=%u key=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str()); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_bool(jlink *pjlink, int val) +{ + TRY { +// TRACE(<jkey<<" "<<(val?"true":"false")); + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "proc") { + pvt->proc = val ? pvaLinkConfig::PP : pvaLinkConfig::NPP; + } else if(pvt->jkey == "sevr") { + pvt->sevr = val ? pvaLinkConfig::MS : pvaLinkConfig::NMS; + } else if(pvt->jkey == "defer") { + pvt->defer = !!val; + } else if(pvt->jkey == "pipeline") { + pvt->pipeline = !!val; + } else if(pvt->jkey == "time") { + pvt->time = !!val; + } else if(pvt->jkey == "retry") { + pvt->retry = !!val; + } else if(pvt->jkey == "local") { + pvt->local = !!val; + } else if(pvt->jkey == "always") { + pvt->always = !!val; + } else if(pvt->debug) { + printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%s\n", + pvt->parseDepth, pvt->jkey.c_str(), val ? "true" : "false"); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_integer(jlink *pjlink, long long val) +{ + TRY { + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "Q") { + pvt->queueSize = val < 1 ? 1 : size_t(val); + } else if(pvt->jkey == "monorder") { + pvt->monorder = std::max(-1024, std::min(int(val), 1024)); + } else if(pvt->debug) { + printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%lld\n", + pvt->parseDepth, pvt->jkey.c_str(), val); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_string(jlink *pjlink, const char *val, size_t len) +{ + TRY{ + std::string sval(val, len); + if(pvt->parseDepth==0 || (pvt->parseDepth==1 && pvt->jkey=="pv")) { + pvt->channelName = sval; + + } else if(pvt->parseDepth > 1) { + // ignore + + } else if(pvt->jkey=="field") { + pvt->fieldName = sval; + + } else if(pvt->jkey=="proc") { + if(sval.empty()) { + pvt->proc = pvaLinkConfig::Default; + } else if(sval=="CP") { + pvt->proc = pvaLinkConfig::CP; + } else if(sval=="CPP") { + pvt->proc = pvaLinkConfig::CPP; + } else if(sval=="PP") { + pvt->proc = pvaLinkConfig::PP; + } else if(sval=="NPP") { + pvt->proc = pvaLinkConfig::NPP; + } else if(pvt->debug) { + printf("pva link parsing unknown proc depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + } else if(pvt->jkey=="sevr") { + if(sval=="NMS") { + pvt->sevr = pvaLinkConfig::NMS; + } else if(sval=="MS") { + pvt->sevr = pvaLinkConfig::MS; + } else if(sval=="MSI") { + pvt->sevr = pvaLinkConfig::MSI; + } else if(sval=="MSS") { + // not sure how to handle mapping severity for MSS. + // leave room for this to happen compatibly later by + // handling as alias for MS until then. + pvt->sevr = pvaLinkConfig::MS; + } else if(pvt->debug) { + printf("pva link parsing unknown sevr depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + } else if(pvt->debug) { + printf("pva link parsing unknown string depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_key_result pva_parse_start_map(jlink *pjlink) +{ + TRY { + return jlif_key_continue; + }CATCH(jlif_key_stop) +} + +jlif_result pva_parse_key_map(jlink *pjlink, const char *key, size_t len) +{ + TRY { + std::string sval(key, len); + pvt->jkey = sval; + + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_end_map(jlink *pjlink) +{ + TRY { + return jlif_continue; + }CATCH(jlif_stop) +} + +struct lset* pva_get_lset(const jlink *pjlink) +{ + return &pva_lset; +} + +void pva_report(const jlink *rpjlink, int lvl, int indent) +{ + const pvaLink *pval = static_cast(rpjlink); + try { + (void)pval; + printf("%*s'pva': %s", indent, "", pval->channelName.c_str()); + if(!pval->fieldName.empty()) + printf("|.%s", pval->fieldName.c_str()); + + switch(pval->proc) { + case pvaLinkConfig::NPP: printf(" NPP"); break; + case pvaLinkConfig::Default: printf(" Def"); break; + case pvaLinkConfig::PP: printf(" PP"); break; + case pvaLinkConfig::CP: printf(" CP"); break; + case pvaLinkConfig::CPP: printf(" CPP"); break; + } + switch(pval->sevr) { + case pvaLinkConfig::NMS: printf(" NMS"); break; + case pvaLinkConfig::MS: printf(" MS"); break; + case pvaLinkConfig::MSI: printf(" MSI"); break; + } + if(lvl>0) { + printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d", + unsigned(pval->queueSize), + pval->pipeline ? 'T' : 'F', + pval->defer ? 'T' : 'F', + pval->time ? 'T' : 'F', + pval->retry ? 'T' : 'F', + pval->monorder); + } + + if(pval->lchan) { + // after open() + Guard G(pval->lchan->lock); + + printf(" conn=%c", pval->lchan->state == pvaLinkChannel::Connected ? 'T' : 'F'); + if(pval->lchan->op_put) { + printf(" Put"); + } + + if(lvl>0) { + printf(" #disconn=%zu prov=%s", pval->lchan->num_disconnect, pval->lchan->providerName.c_str()); + } +// if(lvl>5) { +// std::ostringstream strm; +// pval->lchan->chan.show(strm); +// printf("\n%*s CH: %s", indent, "", strm.str().c_str()); +// } + } else { + printf(" No Channel"); + } + printf("\n"); + }CATCH() +} + +} //namespace + +jlif lsetPVA = { + "pva", + &pva_alloc_jlink, + &pva_free_jlink, + &pva_parse_null, + &pva_parse_bool, + &pva_parse_integer, + NULL, + &pva_parse_string, + &pva_parse_start_map, + &pva_parse_key_map, + &pva_parse_end_map, + NULL, + NULL, + NULL, + &pva_get_lset, + &pva_report, + NULL +}; + +} //namespace pvalink + +extern "C" { +using pvxlink::lsetPVA; +epicsExportAddress(jlif, lsetPVA); +} diff --git a/ioc/pvalink_link.cpp b/ioc/pvalink_link.cpp new file mode 100644 index 000000000..817197642 --- /dev/null +++ b/ioc/pvalink_link.cpp @@ -0,0 +1,149 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include + +#include "pvalink.h" + +DEFINE_LOGGER(_logger, "ioc.pvalink.link"); + +namespace pvxlink { + +pvaLink::pvaLink() +{ + //TODO: valgrind tells me these aren't initialized by Base, but probably should be. + parseDepth = 0; + parent = 0; +} + +pvaLink::~pvaLink() +{ + alive = false; + + if(lchan) { // may be NULL if parsing fails + Guard G(lchan->lock); + + lchan->links.erase(this); + lchan->links_changed = true; + + bool new_debug = false; + for(pvaLinkChannel::links_t::const_iterator it(lchan->links.begin()), end(lchan->links.end()) + ; it!=end; ++it) + { + const pvaLink *pval = *it; + if(pval->debug) { + new_debug = true; + break; + } + } + + lchan->debug = new_debug; + } +} + +Value pvaLink::makeRequest() +{ + // TODO: cache TypeDef in global + using namespace pvxs::members; + return TypeDef(TypeCode::Struct, { + Struct("field", {}), + Struct("record", { + Struct("_options", { + Bool("pipeline"), + Bool("atomic"), + UInt32("queueSize"), + }), + }), + }).create() + .update("record._options.pipeline", pipeline) + .update("record._options.atomic", true) + .update("record._options.queueSize", uint32_t(queueSize)); +} + +// caller must lock lchan->lock +bool pvaLink::valid() const +{ + return lchan->state == pvaLinkChannel::Connected && lchan->root; +} + +// caller must lock lchan->lock +Value pvaLink::getSubField(const char *name) +{ + Value ret; + if(valid()) { + if(fieldName.empty()) { + // we access the top level struct + ret = lchan->root[name]; + + } else { + // we access a sub-struct + ret = lchan->root[fieldName]; + if(!ret) { + // noop + } else if(ret.type()!=TypeCode::Struct) { + // addressed sub-field isn't a sub-structure + if(strcmp(name, "value")!=0) { + // unless we are trying to fetch the "value", we fail here + ret = Value(); + } + } else { + ret = ret[name]; + } + } + } + return ret; +} + +// call with channel lock held +void pvaLink::onDisconnect() +{ + log_debug_printf(_logger, "%s disconnect\n", plink->precord->name); + // TODO: option to remain queue'd while disconnected + + used_queue = used_scratch = false; +} + +void pvaLink::onTypeChange() +{ + log_debug_printf(_logger, "%s type change\n", plink->precord->name); + + assert(lchan->state == pvaLinkChannel::Connected && lchan->root); // we should only be called when connected + + fld_value = getSubField("value"); + fld_seconds = getSubField("timeStamp.secondsPastEpoch"); + fld_nanoseconds = getSubField("timeStamp.nanoseconds"); + fld_severity = getSubField("alarm.severity"); + fld_display = getSubField("display"); + fld_control = getSubField("control"); + fld_valueAlarm = getSubField("valueAlarm"); + + // build mask of all "changed" bits associated with our .value + // CP/CPP input links will process this link only for updates where + // the changed mask and proc_changed share at least one set bit. +// if(fld_value) { +// // bit for this field +// proc_changed.set(fld_value->getFieldOffset()); + +// // bits of all parent fields +// for(const pvd::PVStructure* parent = fld_value->getParent(); parent; parent = parent->getParent()) { +// proc_changed.set(parent->getFieldOffset()); +// } + +// if(fld_value->getField()->getType()==pvd::structure) +// { +// // bits of all child fields +// const pvd::PVStructure *val = static_cast(fld_value.get()); +// for(size_t i=val->getFieldOffset(), N=val->getNextFieldOffset(); i +#include +#include + +#include +#include "dbentry.h" +#include "pvalink.h" +#include "utilpvt.h" + +#include // redirect stdout/stderr; include after libevent/util.h + +DEFINE_LOGGER(_logger, "pvxs.pvalink.lset"); + +namespace pvxlink { +namespace { +using namespace pvxs; + +#define TRY pvaLink *self = static_cast(plink->value.json.jlink); assert(self->alive); try +#define CATCH() catch(std::exception& e) { \ + errlogPrintf("pvaLink %s fails %s: %s\n", __func__, plink->precord->name, e.what()); \ +} + +#define CHECK_VALID() if(!self->valid()) { log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); return -1;} + +dbfType getLinkType(DBLINK *plink) +{ + ioc::DBEntry ent(plink->precord); + + for(long status = dbFirstField(ent, 0); !status; status = dbNextField(ent, 0)) { + if(ent->pfield==plink) + return ent->pflddes->field_type; + } + throw std::logic_error("DBLINK* corrupt"); +} + +void pvaOpenLink(DBLINK *plink) +{ + try { + pvaLink* self((pvaLink*)plink->value.json.jlink); + self->type = getLinkType(plink); + + // workaround for Base not propagating info(base:lsetDebug to us + { + ioc::DBEntry rec(plink->precord); + + if(epicsStrCaseCmp(rec.info("base:lsetDebug", "NO"), "YES")==0) { + self->debug = 1; + } + } + + log_debug_printf(_logger, "%s OPEN %s\n", plink->precord->name, self->channelName.c_str()); + + // still single threaded at this point. + // also, no pvaLinkChannel::lock yet + + self->plink = plink; + + if(self->channelName.empty()) + return; // nothing to do... + + auto pvRequest(self->makeRequest()); + pvaGlobal_t::channels_key_t key = std::make_pair(self->channelName, std::string(SB()< chan; + bool doOpen = false; + { + Guard G(pvaGlobal->lock); + + pvaGlobal_t::channels_t::iterator it(pvaGlobal->channels.find(key)); + + if(it!=pvaGlobal->channels.end()) { + // re-use existing channel + chan = it->second.lock(); + } + + if(!chan) { + // open new channel + + chan.reset(new pvaLinkChannel(key, pvRequest)); + chan->AP->lc = chan; + pvaGlobal->channels.insert(std::make_pair(key, chan)); + doOpen = true; + } + + doOpen &= pvaGlobal->running; // if not running, then open from initHook + } + + if(doOpen) { + chan->open(); // start subscription + } + + if(!self->local || chan->providerName=="QSRV"){ + Guard G(chan->lock); + + chan->links.insert(self); + chan->links_changed = true; + + self->lchan.swap(chan); // we are now attached + + self->lchan->debug |= !!self->debug; + } else { + // TODO: only print duing iocInit()? + fprintf(stderr, "%s Error: local:true link to '%s' can't be fulfilled\n", + plink->precord->name, self->channelName.c_str()); + plink->lset = NULL; + } + + return; + }CATCH() + // on error, prevent any further calls to our lset functions + plink->lset = NULL; +} + +void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink) +{ + try { + std::unique_ptr self((pvaLink*)plink->value.json.jlink); + log_debug_printf(_logger, "%s: %s %s\n", __func__, plink->precord->name, self->channelName.c_str()); + assert(self->alive); + + }CATCH() +} + +int pvaIsConnected(const DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + + bool ret = self->valid(); + log_debug_printf(_logger, "%s: %s %s\n", __func__, plink->precord->name, self->channelName.c_str()); + return ret; + + }CATCH() + return 0; +} + +int pvaGetDBFtype(const DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + // if fieldName is empty, use top struct value + // if fieldName not empty + // if sub-field is struct, use sub-struct .value + // if sub-field not struct, treat as value + + auto value(self->getSubField("value")); + auto vtype(value.type()); + if(vtype.isarray()) + vtype = vtype.scalarOf(); + + switch(value.type().code) { + case TypeCode::Int8: return DBF_CHAR; + case TypeCode::Int16: return DBF_SHORT; + case TypeCode::Int32: return DBF_LONG; + case TypeCode::Int64: return DBF_INT64; + case TypeCode::UInt8: return DBF_UCHAR; + case TypeCode::UInt16: return DBF_USHORT; + case TypeCode::UInt32: return DBF_ULONG; + case TypeCode::UInt64: return DBF_UINT64; + case TypeCode::Float32: return DBF_FLOAT; + case TypeCode::Float64: return DBF_DOUBLE; + case TypeCode::String: return DBF_STRING; + case TypeCode::Struct: { + if(value.id()=="enum_t" + && value["index"].type().kind()==Kind::Integer + && value["choices"].type()==TypeCode::StringA) + return DBF_ENUM; + } + // fall through + default: + return DBF_LONG; // default for un-mapable types. + } + + }CATCH() + return -1; +} + +long pvaGetElements(const DBLINK *plink, long *nelements) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + shared_array arr; + if(!self->fld_value.type().isarray()) { + *nelements = 1; + } else if(self->fld_value.as(arr)) { + *nelements = arr.size(); + } + return 0; + }CATCH() + return -1; +} + +long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, + long *pnRequest) +{ + TRY { + Guard G(self->lchan->lock); + + if(!self->valid()) { + // disconnected + if(self->sevr != pvaLink::NMS) { + recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity); + } + // TODO: better capture of disconnect time + epicsTimeGetCurrent(&self->snap_time); + if(self->time) { + plink->precord->time = self->snap_time; + } + log_debug_printf(_logger, "%s: %s not valid", __func__, self->channelName.c_str()); + return -1; + } + + auto nReq(pnRequest ? *pnRequest : 1); + auto value(self->fld_value); + + if(value.type()==TypeCode::Any) + value = value.lookup("->"); + + if(nReq <= 0 || !value) { + if(!pnRequest) { + // TODO: fill in dummy scalar + nReq = 1; + } + + } else if(value.type().isarray()) { + auto arr(value.as>()); + + if(size_t(nReq) < arr.size()) + nReq = arr.size(); + + if(arr.original_type()==ArrayType::String) { + auto sarr(arr.castTo()); + + if(dbrType==DBR_STRING) { + auto cbuf(reinterpret_cast(pbuffer)); + for(size_t i : range(size_t(nReq))) { + strncpy(cbuf + i*MAX_STRING_SIZE, + sarr[i].c_str(), + MAX_STRING_SIZE-1u); + cbuf[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0'; + } + } else { + return S_db_badDbrtype; // TODO: allow implicit parse? + } + + } else { + ArrayType dtype; + switch(dbrType) { + case DBR_CHAR: dtype = ArrayType::Int8; break; + case DBR_SHORT: dtype = ArrayType::Int16; break; + case DBR_LONG: dtype = ArrayType::Int32; break; + case DBR_INT64: dtype = ArrayType::Int64; break; + case DBR_UCHAR: dtype = ArrayType::UInt8; break; + case DBR_USHORT: dtype = ArrayType::UInt16; break; + case DBR_ULONG: dtype = ArrayType::UInt32; break; + case DBR_UINT64: dtype = ArrayType::UInt64; break; + case DBR_FLOAT: dtype = ArrayType::Float32; break; + case DBR_DOUBLE: dtype = ArrayType::Float64; break; + default: + return S_db_badDbrtype; + } + + detail::convertArr(dtype, pbuffer, + arr.original_type(), arr.data(), + size_t(nReq)); + } + + } else { // scalar + // TODO: special case for "long string" + + if(value.type()==TypeCode::Struct && self->fld_value.id()=="enum_t") { // NTEnum + auto index(value["index"].as()); + switch(dbrType) { + case DBR_CHAR: *reinterpret_cast(pbuffer) = index; break; + case DBR_SHORT: *reinterpret_cast(pbuffer) = index; break; + case DBR_LONG: *reinterpret_cast(pbuffer) = index; break; + case DBR_INT64: *reinterpret_cast(pbuffer) = index; break; + case DBR_UCHAR: *reinterpret_cast(pbuffer) = index; break; + case DBR_USHORT: *reinterpret_cast(pbuffer) = index; break; + case DBR_ULONG: *reinterpret_cast(pbuffer) = index; break; + case DBR_UINT64: *reinterpret_cast(pbuffer) = index; break; + case DBR_FLOAT: *reinterpret_cast(pbuffer) = index; break; + case DBR_DOUBLE: *reinterpret_cast(pbuffer) = index; break; + case DBR_STRING: { + auto cbuf(reinterpret_cast(pbuffer)); + auto choices(value["choices"].as>()); + if(index>=0 && size_t(index) < choices.size()) { + auto& choice(choices[index]); + strncpy(cbuf, choice.c_str(), MAX_STRING_SIZE-1u); + + } else { + epicsSnprintf(cbuf, MAX_STRING_SIZE-1u, "%u", unsigned(index)); + } + cbuf[MAX_STRING_SIZE-1u] = '\0'; + break; + } + default: + return S_db_badDbrtype; + } + + } else { // plain scalar + switch(dbrType) { + case DBR_CHAR: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_SHORT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_LONG: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_INT64: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_UCHAR: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_USHORT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_ULONG: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_UINT64: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_FLOAT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_DOUBLE: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_STRING: { + auto cbuf(reinterpret_cast(pbuffer)); + auto sval(value.as()); + strncpy(cbuf, sval.c_str(), MAX_STRING_SIZE-1u); + cbuf[MAX_STRING_SIZE-1u] = '\0'; + break; + } + default: + return S_db_badDbrtype; + } + } + nReq = 1; + } + + if(pnRequest) + *pnRequest = nReq; + + if(self->fld_seconds) { + self->snap_time.secPastEpoch = self->fld_seconds.as() - POSIX_TIME_AT_EPICS_EPOCH; + if(self->fld_nanoseconds) { + self->snap_time.nsec = self->fld_nanoseconds.as(); + } else { + self->snap_time.nsec = 0u; + } + } else { + self->snap_time.secPastEpoch = 0u; + self->snap_time.nsec = 0u; + } + + if(self->fld_severity) { + self->snap_severity = self->fld_severity.as(); + } else { + self->snap_severity = NO_ALARM; + } + + if((self->snap_severity!=NO_ALARM && self->sevr == pvaLink::MS) || + (self->snap_severity==INVALID_ALARM && self->sevr == pvaLink::MSI)) + { + recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity); + } + + if(self->time) { + plink->precord->time = self->snap_time; + } + + log_debug_printf(_logger, "%s: %s %s OK\n", __func__, plink->precord->name, self->channelName.c_str()); + return 0; + }CATCH() + return -1; +} + +long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(self->fld_control) { + Value value; + if(lo) { + if(!self->fld_control["limitLow"].as(*lo)) + *lo = 0.0; + } + if(hi) { + if(!self->fld_control["limitHigh"].as(*hi)) + *hi = 0.0; + } + } else { + *lo = *hi = 0.0; + } + log_debug_printf(_logger, "%s: %s %s %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(self->fld_display) { + Value value; + if(lo) { + if(!self->fld_display["limitLow"].as(*lo)) + *lo = 0.0; + } + if(hi) { + if(!self->fld_display["limitHigh"].as(*hi)) + *hi = 0.0; + } + } else { + *lo = *hi = 0.0; + } + log_debug_printf(_logger, "%s: %s %s %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo, + double *hi, double *hihi) +{ + TRY { + //Guard G(self->lchan->lock); + //CHECK_VALID(); + *lolo = *lo = *hi = *hihi = 0.0; + log_debug_printf(_logger, "%s: %s %s %f %f %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), + lo ? *lo : 0, lolo ? *lolo : 0, hi ? *hi : 0, hihi ? *hihi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetPrecision(const DBLINK *plink, short *precision) +{ + TRY { + //Guard G(self->lchan->lock); + //CHECK_VALID(); + + // No sane way to recover precision from display.format string. + *precision = 0; + log_debug_printf(_logger, "%s: %s %s %i\n", __func__, plink->precord->name, self->channelName.c_str(), *precision); + return 0; + }CATCH() + return -1; +} + +long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(unitsSize==0) return 0; + + std::string egu; + if(units && self->fld_display.as(egu)) { + strncpy(units, egu.c_str(), unitsSize-1u); + units[unitsSize-1u] = '\0'; + } else if(units) { + units[0] = '\0'; + } + units[unitsSize-1] = '\0'; + log_debug_printf(_logger, "%s: %s %s %s\n", __func__, plink->precord->name, self->channelName.c_str(), units); + return 0; + }CATCH() + return -1; +} + +long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, + epicsEnum16 *severity) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(severity) { + *severity = self->snap_severity; + } + if(status) { + *status = self->snap_severity ? LINK_ALARM : NO_ALARM; + } + log_debug_printf(_logger, "%s: %s %s %i %i\n", + __func__, plink->precord->name, self->channelName.c_str(), severity ? *severity : 0, status ? *status : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(pstamp) { + *pstamp = self->snap_time; + } + log_debug_printf(_logger, "%s: %s %s %i %i\n", __func__, plink->precord->name, self->channelName.c_str(), pstamp ? pstamp->secPastEpoch : 0, pstamp ? pstamp->nsec : 0); + return 0; + }CATCH() + return -1; +} + +long pvaPutValueX(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest, bool wait) +{ + TRY { + (void)self; + Guard G(self->lchan->lock); + + if(nRequest < 0) return -1; + + if(!self->retry && !self->valid()) { + log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); + return -1; + } + + shared_array buf; + + if(dbrType == DBF_STRING) { + const char *sbuffer = (const char*)pbuffer; + shared_array sval(nRequest); + + for(long n=0; nput_scratch = sval.freeze().castTo(); + + } else { + ArrayType dtype; + switch(dbrType) { + case DBR_CHAR: dtype = ArrayType::Int8; break; + case DBR_SHORT: dtype = ArrayType::Int16; break; + case DBR_LONG: dtype = ArrayType::Int32; break; + case DBR_INT64: dtype = ArrayType::Int64; break; + case DBR_UCHAR: dtype = ArrayType::UInt8; break; + case DBR_USHORT: dtype = ArrayType::UInt16; break; + case DBR_ULONG: dtype = ArrayType::UInt32; break; + case DBR_UINT64: dtype = ArrayType::UInt64; break; + case DBR_FLOAT: dtype = ArrayType::Float32; break; + case DBR_DOUBLE: dtype = ArrayType::Float64; break; + default: + return S_db_badDbrtype; + } + + auto val(detail::copyAs(dtype, dtype, pbuffer, size_t(nRequest))); + + self->put_scratch = val.freeze().castTo(); + } + + self->used_scratch = true; + +#ifdef USE_MULTILOCK + if(wait) + self->lchan->after_put.insert(plink->precord); +#endif + + if(!self->defer) self->lchan->put(); + + log_debug_printf(_logger, "%s: %s %s %s\n", __func__, plink->precord->name, self->channelName.c_str(), self->lchan->root.valid() ? "valid": "not valid"); + + return 0; + }CATCH() + return -1; +} + +long pvaPutValue(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, false); +} + +long pvaPutValueAsync(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, true); +} + +void pvaScanForward(DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + + if(!self->retry && !self->valid()) { + return; + } + + // FWD_LINK is never deferred, and always results in a Put + self->lchan->put(true); + + log_debug_printf(_logger, "%s: %s %s %s\n", + __func__, plink->precord->name, self->channelName.c_str(), self->lchan->root.valid() ? "valid": "not valid"); + }CATCH() +} + +#undef TRY +#undef CATCH + +} //namespace + +lset pva_lset = { + 0, 1, // non-const, volatile + &pvaOpenLink, + &pvaRemoveLink, + NULL, NULL, NULL, + &pvaIsConnected, + &pvaGetDBFtype, + &pvaGetElements, + &pvaGetValue, + &pvaGetControlLimits, + &pvaGetGraphicLimits, + &pvaGetAlarmLimits, + &pvaGetPrecision, + &pvaGetUnits, + &pvaGetAlarm, + &pvaGetTimeStamp, + &pvaPutValue, + &pvaPutValueAsync, + &pvaScanForward + //&pvaReportLink, +}; + +} // namespace pvxlink diff --git a/ioc/pvxs/iochooks.h b/ioc/pvxs/iochooks.h index ca090dbd3..6d8e8c3b7 100644 --- a/ioc/pvxs/iochooks.h +++ b/ioc/pvxs/iochooks.h @@ -100,5 +100,14 @@ void testPrepare(); PVXS_IOC_API void testShutdown(); +PVXS_IOC_API +void testqsrvWaitForLinkEvent(struct link *plink); + +PVXS_IOC_API +void testqsrvShutdownOk(void); + +PVXS_IOC_API +void testqsrvCleanup(void); + }} // namespace pvxs::ioc #endif // PVXS_IOCHOOKS_H diff --git a/ioc/pvxsIoc.dbd b/ioc/pvxs3x.dbd similarity index 100% rename from ioc/pvxsIoc.dbd rename to ioc/pvxs3x.dbd diff --git a/ioc/pvxs7x.dbd b/ioc/pvxs7x.dbd new file mode 100644 index 000000000..e0ec7a6b3 --- /dev/null +++ b/ioc/pvxs7x.dbd @@ -0,0 +1,11 @@ +registrar(pvxsBaseRegistrar) +registrar(pvxsSingleSourceRegistrar) +registrar(pvxsGroupSourceRegistrar) +registrar(installPVAAddLinkHook) +link("pva", "lsetPVA") + +# from demo.cpp +device(waveform, CONSTANT, devWfPDBQ2Demo, "QSRV2 Demo") +device(longin, CONSTANT, devLoPDBQ2UTag, "QSRV2 Set UTag") +# from imagedemo.c +function(QSRV2_image_demo) diff --git a/test/Makefile b/test/Makefile index 2abca671e..47714460a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -11,6 +11,7 @@ include $(TOP)/configure/CONFIG_PVXS_VERSION # access to private headers USR_CPPFLAGS += -I$(TOP)/src +USR_CPPFLAGS += -I$(TOP)/ioc PROD_LIBS = pvxs Com @@ -120,6 +121,12 @@ ifdef BASE_7_0 TESTPROD_HOST += benchdata benchdata_SRCS += benchdata.cpp +TESTPROD_HOST += testpvalink +testpvalink_SRCS += testpvalink.cpp +testpvalink_SRCS += testioc_registerRecordDeviceDriver.cpp +testpvalink_LIBS += pvxsIoc pvxs $(EPICS_BASE_IOC_LIBS) +TESTS += testpvalink + endif ifdef BASE_3_15 diff --git a/test/testioc.h b/test/testioc.h index 39f199402..05a5b3a3e 100644 --- a/test/testioc.h +++ b/test/testioc.h @@ -33,6 +33,7 @@ class TestIOC { if(running) { pvxs::ioc::testShutdown(); testIocShutdownOk(); + running = false; } } ~TestIOC() { diff --git a/test/testpvalink.cpp b/test/testpvalink.cpp new file mode 100644 index 000000000..1a7b4ea2b --- /dev/null +++ b/test/testpvalink.cpp @@ -0,0 +1,218 @@ + +#include +#include +#include + +//#include +//#include "utilities.h" +#include "dblocker.h" +#include "pvxs/iochooks.h" +#include "pvalink.h" +#include "testioc.h" +//#include "pv/qsrv.h" + +using namespace pvxs::ioc; +using namespace pvxs; + +namespace +{ + void testGet() + { + testDiag("==== testGet ===="); + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbGetFieldEqual("target:i.VAL", DBF_LONG, 42L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 0L); // value before first process + + testdbGetFieldEqual("src:i1.INP", DBF_STRING, "{\"pva\":\"target:i\"}"); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 42L); + + testdbPutFieldOk("src:i1.INP", DBF_STRING, "{\"pva\":\"target:ai\"}"); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 42L); // changing link doesn't automatically process + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); // now it's changed + } + + void testFieldLinks() { + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test field links ===="); + + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"field\":\"display.precision\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); // changing link doesn't automatically process + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 2L); // changing link doesn't automatically process + + } + + void testProc() + { + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test proc settings ===="); + + // Set it to CPP + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"proc\":\"CPP\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + // Link should read old value again + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); + + testdbPutFieldOk("target:ai", DBF_FLOAT, 5.0); + + // We are already connected at this point, wait for the update. + testqsrvWaitForLinkEvent(&i1->inp); + + // now it's changed + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 5L); + } + + void testSevr() + { + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test severity forwarding (NMS, MS, MSI) ===="); + + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"NMS\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbPutFieldOk("target:ai.LOLO", DBF_FLOAT, 5.0); + testdbPutFieldOk("target:ai.LLSV", DBF_STRING, "MAJOR"); + testdbPutFieldOk("target:ai", DBF_FLOAT, 0.0); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevNone); + + pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"MS\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevMajor); + + pv_name = "{\"pva\":{\"pv\":\"target:mbbi\",\"sevr\":\"MSI\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + while (!dbIsLinkConnected(&i1->inp)) + testqsrvWaitForLinkEvent(&i1->inp); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevNone); + + testdbPutFieldOk("target:ai", DBF_FLOAT, 1.0); + testqsrvWaitForLinkEvent(&i1->inp); + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevInvalid); + } + + void testPut() + { + testDiag("==== testPut ===="); + + longoutRecord *o2 = (longoutRecord *)testdbRecordPtr("src:o2"); + + while (!dbIsLinkConnected(&o2->out)) + testqsrvWaitForLinkEvent(&o2->out); + + testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 43L); + testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 0L); + testdbGetFieldEqual("src:o2.OUT", DBF_STRING, "{\"pva\":\"target:i2\"}"); + + testdbPutFieldOk("src:o2.VAL", DBF_LONG, 14L); + + // TODO: This test will only be implemented after the pva link puts work. + //testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 14L); + testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 14L); + } + + void testPutAsync() + { +#ifdef USE_MULTILOCK + testDiag("==== testPutAsync ===="); + + longoutRecord *trig = (longoutRecord *)testdbRecordPtr("async:trig"); + + while (!dbIsLinkConnected(&trig->out)) + testqsrvWaitForLinkEvent(&trig->out); + + testMonitor *done = testMonitorCreate("async:after", DBE_VALUE, 0); + + testdbPutFieldOk("async:trig.PROC", DBF_LONG, 1); + testMonitorWait(done); + + testdbGetFieldEqual("async:trig", DBF_LONG, 1); + testdbGetFieldEqual("async:slow", DBF_LONG, 1); // pushed from async:trig + testdbGetFieldEqual("async:slow2", DBF_LONG, 2); + testdbGetFieldEqual("async:after", DBF_LONG, 3); + +#else + testSkip(5, "Not USE_MULTILOCK"); +#endif + } + +} // namespace + +extern "C" void testioc_registerRecordDeviceDriver(struct dbBase *); + +MAIN(testpvalink) +{ + testPlan(37); + testSetup(); + + try + { + TestIOC IOC; + + testdbReadDatabase("testioc.dbd", NULL, NULL); + testioc_registerRecordDeviceDriver(pdbbase); + testdbReadDatabase("testpvalink.db", NULL, NULL); + + IOC.init(); + testGet(); + testFieldLinks(); + testProc(); + testSevr(); + testPut(); + (void)testPutAsync; + testqsrvShutdownOk(); + IOC.shutdown(); + testqsrvCleanup(); + } + catch (std::exception &e) + { + testFail("Unexpected exception: %s", e.what()); + } + // call epics atexits explicitly as workaround for c++ static dtor issues... + epicsExit(testDone()); +} diff --git a/test/testpvalink.db b/test/testpvalink.db new file mode 100644 index 000000000..97dfae93c --- /dev/null +++ b/test/testpvalink.db @@ -0,0 +1,63 @@ + +# used by testGet(), testFieldLinks, testProc, testSevr +record(longin, "target:i") { + field(VAL, "42") +} +record(ai, "target:ai") { + field(VAL, "4.0") + field(FLNK, "target:mbbi") + field(PREC, "2") +} + +record(longin, "src:i1") { + field(INP, {"pva":"target:i"}) +} + +record(mbbi, "target:mbbi") { + field(INP, "target:ai") + field(ZRSV, "NO_ALARM") + field(ONSV, "INVALID") +} + +# used by testPut() +record(longin, "target:i2") { + field(VAL, "43") +} + +record(longout, "src:o2") { + field(OUT, {"pva":"target:i2"}) +} + +# used by testPutAsync() +record(calc, "async:seq") { + field(CALC, "VAL+1") + field(VAL , "0") + field(TPRO, "1") +} + +record(longout, "async:trig") { + field(OMSL, "closed_loop") + field(DOL , "async:seq PP") + field(DTYP, "Async Soft Channel") + field(OUT , { "pva":{"pv":"async:slow.A", "proc":true} }) + field(FLNK, "async:after") + field(TPRO, "1") +} + +record(calcout, "async:slow") { + field(ODLY, "1") + field(CALC, "A") + field(FLNK, "async:slow2") + field(TPRO, "1") +} + +record(longin, "async:slow2") { + field(INP , "async:seq PP") + field(TPRO, "1") +} + +record(longin, "async:after") { + field(INP , "async:seq PP") + field(MDEL, "-1") + field(TPRO, "1") +}