diff --git a/.gitignore b/.gitignore index 293846184..3b2a40b95 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ *.orig *.log .*.swp +.vscode \ No newline at end of file diff --git a/documentation/pvalink-schema-0.json b/documentation/pvalink-schema-0.json new file mode 100644 index 000000000..0f0410235 --- /dev/null +++ b/documentation/pvalink-schema-0.json @@ -0,0 +1,36 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://mdavidsaver.github.io/pvxs/pvalink-schema-0.json", + "title": "PVA Link schema", + "type": ["string", "object"], + "properties": { + "pv": { "type": "string" }, + "field": { + "type": "string", + "default": "value" + }, + "Q": { + "type": "integer", + "default": 4 + }, + "proc": { + "type": ["boolean", "string", "null"], + "enum": [true, false, null, "", "NPP", "PP", "CP", "CPP"], + "default": null + }, + "sevr": { + "type": ["boolean", "string"], + "enum": [true, false, "NMS", "MS", "MSI", "MSS"], + "default": "NMS" + }, + "time": { "type": "boolean", "default": false }, + "monorder": { "type": "integer", "default": 0 }, + "defer": { "type": "boolean", "default": false }, + "retry": { "type": "boolean", "default": false }, + "pipeline": { "type": "boolean", "default": false }, + "always": { "type": "boolean", "default": false }, + "atomic": { "type": "boolean", "default": false }, + "local": { "type": "boolean", "default": false } + }, + "additionalProperties": false +} diff --git a/ioc/Makefile b/ioc/Makefile index 4c0cea824..8366e9c1e 100644 --- a/ioc/Makefile +++ b/ioc/Makefile @@ -57,8 +57,13 @@ pvxsIoc_SRCS += groupconfigprocessor.cpp pvxsIoc_SRCS += groupprocessorcontext.cpp pvxsIoc_SRCS += groupsource.cpp pvxsIoc_SRCS += groupsourcehooks.cpp +pvxsIoc_SRCS += pvalink.cpp +pvxsIoc_SRCS += pvalink_channel.cpp +pvxsIoc_SRCS += pvalink_jlif.cpp +pvxsIoc_SRCS += pvalink_link.cpp +pvxsIoc_SRCS += pvalink_lset.cpp -else # BASE_7_0 +else pvxsIoc_SRCS += dummygroup.cpp @@ -84,3 +89,10 @@ include $(TOP)/configure/RULES_PVXS_MODULE #---------------------------------------- # ADD RULES AFTER THIS LINE +ifdef BASE_7_0 +../O.Common/pvxsIoc.dbd: ../pvxs7x.dbd + $(CP) $< $@ +else +../O.Common/pvxsIoc.dbd: ../pvxs3x.dbd + $(CP) $< $@ +endif diff --git a/ioc/groupsourcehooks.cpp b/ioc/groupsourcehooks.cpp index 9b5050b5a..bf9e16a2e 100644 --- a/ioc/groupsourcehooks.cpp +++ b/ioc/groupsourcehooks.cpp @@ -20,6 +20,7 @@ #include #include +#include "qsrvpvt.h" #include "groupsource.h" #include "groupconfigprocessor.h" #include "iocshcommand.h" @@ -161,66 +162,49 @@ long dbLoadGroup(const char* jsonFilename, const char* macros) { } } -} -} // namespace pvxs::ioc -using namespace pvxs::ioc; +void processGroups() +{ + GroupConfigProcessor processor; + epicsGuard G(processor.config.groupMapMutex); -namespace { -using namespace pvxs; + // Parse all info(Q:Group... records to configure groups + processor.loadConfigFromDb(); -/** - * Initialise qsrv database group records by adding them as sources in our running pvxs server instance - * - * @param theInitHookState the initHook state - we only want to trigger on the initHookAfterIocBuilt state - ignore all others - */ -void qsrvGroupSourceInit(initHookState theInitHookState) { - try { - if(!IOCSource::enabled()) - return; - if (theInitHookState == initHookAfterInitDatabase) { - GroupConfigProcessor processor; - epicsGuard G(processor.config.groupMapMutex); + // Load group configuration files + processor.loadConfigFiles(); - // Parse all info(Q:Group... records to configure groups - processor.loadConfigFromDb(); + // checks on groupConfigMap + processor.validateGroups(); - // Load group configuration files - processor.loadConfigFiles(); + // Configure groups + processor.defineGroups(); - // checks on groupConfigMap - processor.validateGroups(); + // Resolve triggers + processor.resolveTriggerReferences(); - // Configure groups - processor.defineGroups(); + // Create Server Groups + processor.createGroups(); +} - // Resolve triggers - processor.resolveTriggerReferences(); +void addGroupSrc() +{ + pvxs::ioc::server() + .addSource("qsrvGroup", std::make_shared(), 1); +} - // Create Server Groups - processor.createGroups(); - } else if (theInitHookState == initHookAfterIocBuilt) { - // Load group configuration from parsed groups in iocServer - pvxs::ioc::server().addSource("qsrvGroup", std::make_shared(), 1); - } - } catch(std::exception& e) { - fprintf(stderr, "ERROR: Unhandled exception in %s(%d): %s\n", - __func__, theInitHookState, e.what()); - } +void resetGroups() +{ + auto& config(IOCGroupConfig::instance()); + + // server stopped at this point, but lock anyway + epicsGuard G(config.groupMapMutex); + + config.groupMap.clear(); + config.groupConfigFiles.clear(); } -/** - * IOC pvxs Group Source registrar. This implements the required registrar function that is called by xxxx_registerRecordDeviceDriver, - * the auto-generated stub created for all IOC implementations. - *

- * It is registered by using the `epicsExportRegistrar()` macro. - *

- * 1. Register your hook handler to handle any state hooks that you want to implement. Here we install - * an `initHookState` handler connected to the `initHookAfterIocBuilt` state. It will add all of the - * group record type sources defined so far. Note that you can define sources up until the `iocInit()` call, - * after which point the `initHookAfterIocBuilt` handlers are called and will register all the defined records. - */ -void pvxsGroupSourceRegistrar() { +void group_enable() { // Register commands to be available in the IOC shell IOCShCommand("pvxgl", "[level, [pattern]]", "Group Sources list.\n" @@ -232,14 +216,6 @@ void pvxsGroupSourceRegistrar() { IOCShCommand("dbLoadGroup", "JSON file", "macros", dbLoadGroupMsg) .implementation<&dbLoadGroupCmd>(); - - initHookRegister(&qsrvGroupSourceInit); } -} // namespace - -// in .dbd file -//registrar(pvxsGroupSourceRegistrar) -extern "C" { -epicsExportRegistrar(pvxsGroupSourceRegistrar); -} +}} // namespace pvxs::ioc diff --git a/ioc/iochooks.cpp b/ioc/iochooks.cpp index 020a119ae..3291a5709 100644 --- a/ioc/iochooks.cpp +++ b/ioc/iochooks.cpp @@ -18,8 +18,12 @@ #include #include +#include #include #include +#include +#include +#include #include #include @@ -28,14 +32,18 @@ #include "iocshcommand.h" #include "utilpvt.h" +#include "qsrvpvt.h" + +#ifdef USE_QSRV_SINGLE +# include +#endif +#ifdef USE_PVA_LINKS +# include "pvalink.h" +#endif // include last to avoid clash of #define printf with other headers #include -#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 4, 0) -# define USE_DEINIT_HOOKS -#endif - namespace pvxs { namespace ioc { @@ -90,16 +98,13 @@ void initialisePvxsServer() { pvxServer->srv = std::move(newsrv); } -/** - * The function to call when we exit the IOC process. This is only installed as the callback function - * after the database has been initialized. This function will stop the pvxs server instance and destroy the - * object. - * - * @param pep - The pointer to the exit parameter list - unused - */ static -void pvxsAtExit(void*) noexcept { +void pvxsExitBeforeIocShutdown(void*) noexcept +{ try { +#ifdef USE_PVA_LINKS + linkGlobal_t::deinit(); +#endif Guard (pvxServer->lock); if(auto srv = std::move(pvxServer->srv)) { pvxServer->srv = server::Server(); @@ -112,19 +117,57 @@ void pvxsAtExit(void*) noexcept { } } -void testPrepare() +static +void pvxsExitAfterIocShutdown(void*) noexcept +{ + try { +#ifdef USE_PVA_LINKS + linkGlobal_t::dtor(); +#endif + + } catch(std::exception& e) { + fprintf(stderr, "Error in %s : %s\n", __func__, e.what()); + } +} + +static +void testPrepareImpl() { if(pvxServer) initialisePvxsServer(); // re-create server for next test cycle } +void testPrepare() +{ +#ifndef USE_PREPARE_CLEANUP_HOOKS + testPrepareImpl(); +#endif +} + void testShutdown() { #ifndef USE_DEINIT_HOOKS - pvxsAtExit(nullptr); + pvxsExitBeforeIocShutdown(nullptr); #endif } +void testAfterShutdown() +{ +#ifndef USE_DEINIT_HOOKS + pvxsExitAfterIocShutdown(nullptr); +#endif +} + +void testCleanupPrepare() +{ + server::Server trash; + { + Guard G(pvxServer->lock); + trash = std::move(pvxServer->srv); + } + resetGroups(); +} + //////////////////////////////////// // Two ioc shell commands for pvxs //////////////////////////////////// @@ -165,6 +208,35 @@ void pvxsi() { printf("%s", capture.str().c_str()); } +#ifdef USE_QSRV_SINGLE +TestIOC::TestIOC() { + testdbPrepare(); + testPrepare(); +} + +void TestIOC::init() { + if(!isRunning) { + testIocInitOk(); + isRunning = true; + } +} + +void TestIOC::shutdown() { + if(isRunning) { + isRunning = false; + testShutdown(); + testIocShutdownOk(); + testAfterShutdown(); + } +} + +TestIOC::~TestIOC() { + shutdown(); + testCleanupPrepare(); + testdbCleanup(); +} +#endif // USE_QSRV_SINGLE + namespace { void pvxrefshow() { @@ -236,26 +308,53 @@ void pvxrefdiff() { } // namespace -/** - * Initialise and control state of pvxs ioc server instance in response to iocInitHook events. - * Installed on the initHookState hook this function will respond to the following events: - * - initHookAfterInitDatabase: Set the exit callback only when we have initialized the database - * - initHookAfterCaServerRunning: Start the pvxs server instance after the CA server starts running - * - initHookAfterCaServerPaused: Pause the pvxs server instance if the CA server pauses - * - * @param theInitHookState the initHook state to respond to - */ static -void pvxsInitHook(initHookState theInitHookState) { +void pvxsInitHook(initHookState theInitHookState) noexcept { switch(theInitHookState) { +#ifdef USE_PREPARE_CLEANUP_HOOKS + case initHookAfterPrepareDatabase: // test only + testPrepareImpl(); + break; +#endif + case initHookAtBeginning: + dbRegisterQSRV2(); + break; + case initHookAfterCaLinkInit: +#ifdef USE_PVA_LINKS + linkGlobal_t::alloc(); +#endif +#ifndef USE_DEINIT_HOOKS + // before epicsExit(exitDatabase), + // so hook registered here will be run after iocShutdown() + { + static bool installed = false; + if(!installed) { + epicsAtExit(&pvxsExitAfterIocShutdown, nullptr); + installed = true; + } + } +#endif + break; case initHookAfterInitDatabase: - // when de-init hooks not available, register for later cleanup via atexit() - // function to run before exitDatabase + processGroups(); #ifndef USE_DEINIT_HOOKS - epicsAtExit(&pvxsAtExit, nullptr); + // register for later cleanup before iocShutdown() + { + static bool installed = false; + if(!installed) { + epicsAtExit(&pvxsExitBeforeIocShutdown, nullptr); + installed = true; + } + } +#endif + break; + case initHookAfterIocBuilt: +#ifdef USE_PVA_LINKS + linkGlobal_t::init(); #endif + addSingleSrc(); + addGroupSrc(); break; - case initHookAfterCaServerRunning: case initHookAfterIocRunning: if(auto srv = server()) { srv.start(); @@ -271,7 +370,15 @@ void pvxsInitHook(initHookState theInitHookState) { #ifdef USE_DEINIT_HOOKS // use de-init hook when available case initHookAtShutdown: - pvxsAtExit(nullptr); + pvxsExitBeforeIocShutdown(nullptr); + break; + case initHookAfterShutdown: + pvxsExitAfterIocShutdown(nullptr); + break; +#endif +#ifdef USE_PREPARE_CLEANUP_HOOKS + case initHookBeforeCleanupDatabase: // test only + testCleanupPrepare(); break; #endif default: @@ -286,6 +393,56 @@ using namespace pvxs::ioc; namespace { +bool enable2() { + // detect if also linked with qsrv.dbd + const bool permit = !registryDeviceSupportFind("devWfPDBDemo"); + bool request = permit; + bool quiet = false; + + auto env_dis = getenv("EPICS_IOC_IGNORE_SERVERS"); + auto env_ena = getenv("PVXS_QSRV_ENABLE"); + + if(env_dis && strstr(env_dis, "qsrv2")) { + request = false; + quiet = true; + + } else if(env_ena && epicsStrCaseCmp(env_ena, "YES")==0) { + request = true; + + } else if(env_ena && epicsStrCaseCmp(env_ena, "NO")==0) { + request = false; + quiet = true; + + } else if(env_ena) { + // will be seen during initialization, print synchronously + fprintf(stderr, "ERROR: PVXS_QSRV_ENABLE=%s not YES/NO. Defaulting to %s.\n", + env_ena, + request ? "YES" : "NO"); + } + + const bool enable = permit && request; + + if(quiet) { + // shut up, I know what I'm doing... + } else if(request && !permit) { + fprintf(stderr, + "WARNING: QSRV1 detected, disabling QSRV2.\n" + " If not intended, omit qsrv.dbd when including pvxsIoc.dbd\n"); + + } else { + printf("INFO: PVXS QSRV2 is loaded, %spermitted, and %s.\n", + permit ? "" : "NOT ", + enable ? "ENABLED" : "disabled"); + + if(!permit) { + printf(" Not permitted due to confict with QSRV1.\n" + " Remove qsrv.dbd from IOC.\n"); + } + } + + return enable; +} + /** * IOC pvxs base registrar. This implements the required registrar function that is called by xxxx_registerRecordDeviceDriver, * the auto-generated stub created for all IOC implementations. @@ -296,10 +453,12 @@ namespace { * 2. Also make sure that you initialize your server implementation - PVXS in our case - so that it will be available for the shell. * 3. Lastly register your hook handler to handle any state hooks that you want to implement */ -void pvxsBaseRegistrar() { +void pvxsBaseRegistrar() noexcept { try { pvxs::logger_config_env(); + bool enableQ = enable2(); + pvxServer = new pvxServer_t(); IOCShCommand("pvxsr", "[show_detailed_information?]", "PVXS Server Report. " @@ -320,6 +479,12 @@ void pvxsBaseRegistrar() { // Register our hook handler to intercept certain state changes initHookRegister(&pvxsInitHook); + + if(enableQ) { + single_enable(); + group_enable(); + pvalink_enable(); + } } catch (std::exception& e) { fprintf(stderr, "Error in %s : %s\n", __func__, e.what()); } diff --git a/ioc/iocsource.cpp b/ioc/iocsource.cpp index 8ebabf96b..869d7d830 100644 --- a/ioc/iocsource.cpp +++ b/ioc/iocsource.cpp @@ -36,46 +36,6 @@ DEFINE_LOGGER(_log, "pvxs.ioc.db"); namespace pvxs { namespace ioc { - -bool IOCSource::enabled() -{ - /* -1 - disabled - * 0 - lazy init, check environment - * 1 - enabled - */ - static std::atomic ena{}; - - auto e = ena.load(); - if(e==0) { - e = inUnitTest() ? 1 : -1; // default to disabled normally (not unittest) - - auto env_dis = getenv("EPICS_IOC_IGNORE_SERVERS"); - auto env_ena = getenv("PVXS_QSRV_ENABLE"); - - if(env_dis && strstr(env_dis, "qsrv2")) { - e = -1; - - } else if(env_ena && epicsStrCaseCmp(env_ena, "YES")==0) { - e = 1; - - } else if(env_ena && epicsStrCaseCmp(env_ena, "NO")==0) { - e = -1; - - } else if(env_ena) { - // will be seen during initialization, print synchronously - fprintf(stderr, "ERROR: PVXS_QSRV_ENABLE=%s not YES/NO. Defaulting to %s.\n", - env_ena, - e==1 ? "YES" : "NO"); - } - printf("INFO: PVXS QSRV2 is loaded and %s\n", - e==1 ? "ENABLED." : "disabled.\n" - " To enable set: epicsEnvSet(\"PVXS_QSRV_ENABLE\",\"YES\")\n" - " and ensure that $EPICS_IOC_IGNORE_SERVERS does not contain \"qsrv2\"."); - ena = e; - } - return e==1; -} - void IOCSource::initialize(Value& value, const MappingInfo &info, const Channel& chan) { if(info.type==MappingInfo::Scalar) { diff --git a/ioc/iocsource.h b/ioc/iocsource.h index 9b16bd73a..c4b27795e 100644 --- a/ioc/iocsource.h +++ b/ioc/iocsource.h @@ -43,8 +43,6 @@ enum type { class IOCSource { public: - static bool enabled(); - static void initialize(Value& value, const MappingInfo &info, const Channel &chan); static void get(Value& valuePrototype, diff --git a/ioc/pvalink.cpp b/ioc/pvalink.cpp new file mode 100644 index 000000000..5a8f398f3 --- /dev/null +++ b/ioc/pvalink.cpp @@ -0,0 +1,335 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include + +#define EPICS_DBCA_PRIVATE_API +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define PVXS_ENABLE_EXPERT_API + +#include + +#include "channel.h" +#include "pvalink.h" +#include "dblocker.h" +#include "dbentry.h" +#include "iocshcommand.h" +#include "utilpvt.h" +#include "qsrvpvt.h" + +#include /* redirects stdout/stderr; include after util.h from libevent */ +#include /* defines epicsExportSharedSymbols */ + +#if EPICS_VERSION_INT>=VERSION_INT(7,0,6,0) +# define HAVE_SHUTDOWN_HOOKS +#endif + +namespace pvxs { +namespace ioc { + +void linkGlobal_t::alloc() +{ + if(linkGlobal) { + cantProceed("# Missing call to testqsrvShutdownOk() and/or testqsrvCleanup()"); + } + linkGlobal = new linkGlobal_t; + + // TODO "local" provider + if (inUnitTest()) { + linkGlobal->provider_remote = ioc::server().clientConfig().build(); + } else { + linkGlobal->provider_remote = client::Config().build(); + } +} + +void linkGlobal_t::init() +{ + Guard G(linkGlobal->lock); + linkGlobal->running = true; + + for(linkGlobal_t::channels_t::iterator it(linkGlobal->channels.begin()), end(linkGlobal->channels.end()); + it != end; ++it) + { + std::shared_ptr chan(it->second.lock()); + if(!chan) continue; + + chan->open(); + } +} + +void linkGlobal_t::deinit() +{ + // no locking here as we assume that shutdown doesn't race startup + if(!linkGlobal) return; + + linkGlobal->close(); +} + +void linkGlobal_t::dtor() +{ + { + Guard G(linkGlobal->lock); + assert(pvaLink::cnt_pvaLink<=1u); // dbRemoveLink() already called + assert(linkGlobal->channels.empty()); + } + + delete linkGlobal; + linkGlobal = NULL; +} + +static +std::shared_ptr testGetPVALink(struct link *plink) +{ + DBLocker lock(plink->precord); + + if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) { + testAbort("Not a PVA link"); + } + pvaLink *pval = static_cast(plink->value.json.jlink); + if(!pval->lchan) + testAbort("PVA link w/o channel?"); + return pval->lchan; +} + +static +DBLINK* testGetLink(const char *pv) +{ + Channel chan(pv); + switch(dbChannelFieldType(chan)) { + case DBF_INLINK: + case DBF_OUTLINK: + case DBF_FWDLINK: + break; + default: + testAbort("%s : not a link field", pv); + } + return static_cast(dbChannelField(chan)); +} + +void testqsrvWaitForLinkConnected(struct link *plink, bool conn) +{ + if(conn) + linkGlobal->provider_remote.hurryUp(); + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + while(lchan->connected!=conn) { + testDiag("%s(\"%s\", %c) sleep", __func__, plink->precord->name, conn?'C':'D'); + UnGuard U(G); + if(!lchan->update_evt.wait(10.0)) + testAbort("%s(\"%s\") timeout", __func__, plink->precord->name); + errlogFlush(); + testDiag("%s(\"%s\") wakeup", __func__, plink->precord->name); + } + errlogFlush(); +} + +void testqsrvWaitForLinkConnected(const char* pv, bool conn) +{ + testqsrvWaitForLinkConnected(testGetLink(pv), conn); +} + +QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(struct link *plink) + :plink(plink) +{ + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + seq = lchan->update_seq; + testDiag("%s(\"%s\") arm at %u", __func__, plink->precord->name, seq); +} + +QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(const char *pv) + :QSrvWaitForLinkUpdate(testGetLink(pv)) +{} + +QSrvWaitForLinkUpdate::~QSrvWaitForLinkUpdate() +{ + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + while(seq == lchan->update_seq) { + testDiag("%s(\"%s\") wait for end of %u", __func__, plink->precord->name, seq); + bool ok; + { + UnGuard U(G); + ok = lchan->update_evt.wait(5.0); + } + if(!ok) + testAbort("%s(\"%s\") timeout at %u", __func__, plink->precord->name, seq); + errlogFlush(); + testDiag("%s(\"%s\") wake at %u", __func__, plink->precord->name, seq); + } +} + +extern "C" +void dbpvar(const char *precordname, int level) +{ + try { + if(!linkGlobal) { + printf("PVA links not initialized\n"); + return; + } + + if (!precordname || precordname[0] == '\0' || !strcmp(precordname, "*")) { + precordname = NULL; + printf("PVA links in all records\n\n"); + } else { + printf("PVA links in record named '%s'\n\n", precordname); + } + + size_t nchans = 0, nlinks = 0, nconn = 0; + + linkGlobal_t::channels_t channels; + { + Guard G(linkGlobal->lock); + channels = linkGlobal->channels; // copy snapshot + } + + for(linkGlobal_t::channels_t::const_iterator it(channels.begin()), end(channels.end()); + it != end; ++it) + { + std::shared_ptr chan(it->second.lock()); + if(!chan) continue; + + Guard G(chan->lock); + + if(precordname) { + // only show links fields of these records + bool match = false; + for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end()); + it2 != end2; ++it2) + { + const pvaLink *pval = *it2; + // plink==NULL shouldn't happen, but we are called for debugging, so be paranoid. + if(pval->plink && epicsStrGlobMatch(pval->plink->precord->name, precordname)) { + match = true; + nlinks++; + } + } + if(!match) + continue; + } + + nchans++; + if(chan->connected) + nconn++; + + if(!precordname) + nlinks += chan->links.size(); + + if(level<=0) + continue; + + if(level>=2 || (!chan->connected && level==1)) { + if(chan->key.first.size()<=28) { + printf("%28s ", chan->key.first.c_str()); + } else { + printf("%s\t", chan->key.first.c_str()); + } + + printf("conn=%c %zu disconnects, %zu type changes", + chan->connected?'T':'F', + chan->num_disconnect, + chan->num_type_change); + if(chan->op_put) { + printf(" Put"); + } + + printf("\n"); + // level 4 reserved for channel/provider details + + if(level>=5) { + for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end()); + it2 != end2; ++it2) + { + const pvaLink *pval = *it2; + + if(!pval->plink) + continue; + else if(precordname && !epicsStrGlobMatch(pval->plink->precord->name, precordname)) + continue; + + const char *fldname = "???"; + ioc::DBEntry rec(pval->plink->precord); + for(bool done = !!dbFirstField(rec, 0); !done; done = !!dbNextField(rec, 0)) + { + if(rec->pfield == (void*)pval->plink) { + fldname = rec->pflddes->name; + break; + } + } + + printf("%*s%s.%s", 30, "", pval->plink ? pval->plink->precord->name : "", fldname); + + switch(pval->proc) { + case pvaLinkConfig::NPP: printf(" NPP"); break; + case pvaLinkConfig::Default: printf(" Def"); break; + case pvaLinkConfig::PP: printf(" PP"); break; + case pvaLinkConfig::CP: printf(" CP"); break; + case pvaLinkConfig::CPP: printf(" CPP"); break; + } + switch(pval->sevr) { + case pvaLinkConfig::NMS: printf(" NMS"); break; + case pvaLinkConfig::MS: printf(" MS"); break; + case pvaLinkConfig::MSI: printf(" MSI"); break; + } + + printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d\n", + unsigned(pval->queueSize), + pval->pipeline ? 'T' : 'F', + pval->defer ? 'T' : 'F', + pval->time ? 'T' : 'F', + pval->retry ? 'T' : 'F', + pval->monorder); + } + printf("\n"); + } + } + } + + printf(" %zu/%zu channels connected used by %zu links\n", + nconn, nchans, nlinks); + + } catch(std::exception& e) { + fprintf(stderr, "Error: %s\n", e.what()); + } +} + +static +const iocshVarDef pvaLinkNWorkersDef[] = { + { + "pvaLinkNWorkers", + iocshArgInt, + &pvaLinkNWorkers + }, + {0, iocshArgInt, 0} +}; + +void pvalink_enable() +{ + IOCShCommand("dbpvar", "dbpvar", "record name", "level") + .implementation<&dbpvar>(); + iocshRegisterVariable(pvaLinkNWorkersDef); + +} + +}} // namespace pvxs::ioc diff --git a/ioc/pvalink.h b/ioc/pvalink.h new file mode 100644 index 000000000..ce3bf9fca --- /dev/null +++ b/ioc/pvalink.h @@ -0,0 +1,273 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PVALINK_H +#define PVALINK_H + +#include +#include + +#define EPICS_DBCA_PRIVATE_API +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "utilpvt.h" +#include "dbmanylocker.h" + + +#if EPICS_VERSION_INT Guard; +typedef epicsGuardRelease UnGuard; + +struct pvaLink; +struct pvaLinkChannel; + +extern lset pva_lset; +extern jlif lsetPVA; + +struct pvaLinkConfig : public jlink +{ + // configuration, output of jlif parsing + //! Channel (aka PV) name string + std::string channelName; + //! sub-field within addressed PVStructure + std::string fieldName; + + size_t queueSize = 4; + + enum pp_t { + NPP, + Default, // for put() only. For monitor, treated as NPP + PP, // for put() only, For monitor, treated as NPP + CP, // for monitor only, put treats as pp + CPP, // for monitor only, put treats as pp + } proc = Default; + enum ms_t { + NMS, + MS, + MSI, + } sevr = NMS; + + bool defer = false; + bool pipeline = false; + bool time = false; + bool retry = false; + bool local = false; + bool always = false; + bool atomic = false; + int monorder = 0; + + // internals used by jlif parsing + std::string jkey; + + pvaLinkConfig() = default; + pvaLinkConfig(const pvaLinkConfig&) = delete; + pvaLinkConfig& operator=(const pvaLinkConfig&) = delete; + virtual ~pvaLinkConfig(); +}; + +struct linkGlobal_t final : private epicsThreadRunable { + client::Context provider_remote; + + MPMCFIFO> queue; + + epicsMutex lock; + + bool running; // set after dbEvent is initialized and safe to use + + // a tuple of channel name and printed pvRequest (or Monitor) + typedef std::pair channels_key_t; + // pvaLinkChannel dtor prunes dead entires + typedef std::map > channels_t; + // Cache of active Channels (really about caching Monitor) + channels_t channels; + + // pvRequest used with PUT + const Value putReq; + +private: + epicsThread worker; + bool workerStop = false; + virtual void run() override final; +public: + + linkGlobal_t(); + linkGlobal_t(const linkGlobal_t&) = delete; + linkGlobal_t& operator=(const linkGlobal_t&) = delete; + virtual ~linkGlobal_t(); + void close(); + + // IOC lifecycle hooks + static void alloc(); + static void init(); + static void deinit(); + static void dtor(); +}; +extern linkGlobal_t *linkGlobal; + +struct pvaLinkChannel final : public epicsThreadRunable + ,public std::enable_shared_from_this +{ + const linkGlobal_t::channels_key_t key; // tuple of (channelName, pvRequest key) + const Value pvRequest; // used with monitor + + INST_COUNTER(pvaLinkChannel); + + // locker order: record lock(s) -> channel lock + epicsMutex lock; + epicsEvent update_evt; // used by testing code + + std::shared_ptr op_mon; + std::shared_ptr op_put; + Value root; + + size_t num_disconnect = 0u, num_type_change = 0u; + + bool connected = false; + bool debug = false; // set if any jlink::debug is set + + unsigned update_seq = 0u; // used by testing code + + typedef std::set after_put_t; + after_put_t after_put; + + struct LinkSort { + bool operator()(const pvaLink *L, const pvaLink *R) const; + }; + + typedef std::set links_t; + + // list of currently attached links. maintained by pvaLink ctor/dtor + // TODO: sort by PHAS + links_t links; + + // set when 'links' is modified to trigger re-compute of record scan list + bool links_changed = false; + + pvaLinkChannel(const linkGlobal_t::channels_key_t& key, const Value &pvRequest); + virtual ~pvaLinkChannel(); + + void open(); + void put(bool force=false); // begin Put op. + + struct AfterPut final : public epicsThreadRunable { + std::weak_ptr lc; + AfterPut() = default; + AfterPut(const AfterPut&) = delete; + AfterPut& operator=(const AfterPut&) = delete; + virtual ~AfterPut() = default; + virtual void run() override final; + }; + const std::shared_ptr AP; +private: + virtual void run() override final; + + // ==== Treat remaining as local to run() + + struct ScanTrack { + dbCommon *prec = nullptr; + // if true, only scan if prec->scan==0 + bool check_passive = false; + + ScanTrack() = default; + ScanTrack(dbCommon *prec, bool check_passive) :prec(prec), check_passive(check_passive) {} + void scan(); + }; + std::vector nonatomic_records, + atomic_records; + + ioc::DBManyLock atomic_lock; +}; + +struct pvaLink final : public pvaLinkConfig +{ + INST_COUNTER(pvaLink); + + bool alive = true; // attempt to catch some use after free + dbfType type = (dbfType)-1; + + DBLINK * plink = nullptr; + + std::shared_ptr lchan; + + bool used_scratch = false; + bool used_queue = false; + shared_array put_scratch, put_queue; + + // cached fields from channel op_mon + // updated in onTypeChange() + Value fld_value, + fld_severity, + fld_message, + fld_seconds, + fld_nanoseconds, + fld_usertag, + fld_meta; + + // cached snapshot of alarm and timestamp + // captured in pvaGetValue(). + // we choose not to ensure consistency with display/control meta-data + epicsTimeStamp snap_time = {}; + epicsUTag snap_tag = 0; + short snap_severity = INVALID_ALARM; + std::string snap_message; + + pvaLink(); + virtual ~pvaLink(); + + // returns pvRequest to be used with monitor + Value makeRequest(); + + bool valid() const; + + void onDisconnect(); + void onTypeChange(); + enum scanOnUpdate_t { + scanOnUpdateNo = -1, + scanOnUpdatePassive = 0, + scanOnUpdateYes = 1, + }; + scanOnUpdate_t scanOnUpdate() const; +}; + + +}} // namespace pvxs::ioc + +#endif // PVALINK_H diff --git a/ioc/pvalink_channel.cpp b/ioc/pvalink_channel.cpp new file mode 100644 index 000000000..d4d4a7866 --- /dev/null +++ b/ioc/pvalink_channel.cpp @@ -0,0 +1,439 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include + +#include "utilpvt.h" +#include "pvalink.h" +#include "dblocker.h" +#include "dbmanylocker.h" + +DEFINE_LOGGER(_logger, "pvxs.ioc.link.channel"); +DEFINE_LOGGER(_logupdate, "pvxs.ioc.link.channel.update"); + +int pvaLinkNWorkers = 1; + +namespace pvxs { +namespace ioc { + +linkGlobal_t *linkGlobal; + + +linkGlobal_t::linkGlobal_t() + :queue() + ,running(false) + ,putReq(TypeDef(TypeCode::Struct, { + members::Struct("field", {}), + members::Struct("record", { + members::Struct("_options", { + members::Bool("block"), + members::String("process"), + }), + }), }).create()) + ,worker(*this, + "pvxlink", + epicsThreadGetStackSize(epicsThreadStackBig), + // worker should be above PVA worker priority? + epicsThreadPriorityMedium) +{ + // TODO respect pvaLinkNWorkers? + worker.start(); +} + +linkGlobal_t::~linkGlobal_t() +{ +} + +void linkGlobal_t::run() +{ + while(1) { + auto w = queue.pop(); + if(auto chan = w.lock()) { + chan->run(); + } + { + Guard G(lock); + if(workerStop) + break; + } + } + +} + +void linkGlobal_t::close() +{ + { + Guard G(lock); + workerStop = true; + } + queue.push(std::weak_ptr()); + worker.exitWait(); +} + +DEFINE_INST_COUNTER(pvaLinkChannel); +DEFINE_INST_COUNTER(pvaLink); + +bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const { + if(L->monorder==R->monorder) + return L < R; + return L->monorder < R->monorder; +} + +// being called with linkGlobal::lock held +pvaLinkChannel::pvaLinkChannel(const linkGlobal_t::channels_key_t &key, const Value& pvRequest) + :key(key) + ,pvRequest(pvRequest) + ,AP(new AfterPut) +{} + +pvaLinkChannel::~pvaLinkChannel() { + { + Guard G(linkGlobal->lock); + linkGlobal->channels.erase(key); + } + + Guard G(lock); + + assert(links.empty()); +} + +void pvaLinkChannel::open() +{ + Guard G(lock); + + op_mon = linkGlobal->provider_remote.monitor(key.first) + .maskConnected(true) + .maskDisconnected(false) + .rawRequest(pvRequest) + .event([this](const client::Subscription&) + { + log_debug_printf(_logger, "Monitor %s wakeup\n", key.first.c_str()); + try { + linkGlobal->queue.push(shared_from_this()); + }catch(std::bad_weak_ptr&){ + log_err_printf(_logger, "channel '%s' open during dtor?", key.first.c_str()); + } + }) + .exec(); +} + +static +Value linkBuildPut(pvaLinkChannel *self, Value&& prototype) +{ + Guard G(self->lock); + + auto top(std::move(prototype)); + + for(auto link : self->links) + { + if(!link->used_queue) continue; + link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop + + auto value(link->fieldName.empty() ? top : top[link->fieldName]); + if(value.type()==TypeCode::Struct) { + // maybe drill into NTScalar et al. + if(auto sub = value["value"]) + value = std::move(sub); + } + + if(!value) continue; // TODO: how to signal error? + + auto tosend(std::move(link->put_queue)); + + if(value.type().isarray()) { + value = tosend; + } else { + if (tosend.empty()) + continue; // TODO: can't write empty array to scalar field Signal error + + if (value.type() == TypeCode::Struct && value.id() == "enum_t") { + value = value["index"]; // We want to assign to the index for enum types + } + + switch (tosend.original_type()) + { + case ArrayType::Int8: value = tosend.castTo()[0]; break; + case ArrayType::Int16: value = tosend.castTo()[0]; break; + case ArrayType::Int32: value = tosend.castTo()[0]; break; + case ArrayType::Int64: value = tosend.castTo()[0]; break; + case ArrayType::UInt8: value = tosend.castTo()[0]; break; + case ArrayType::UInt16: value = tosend.castTo()[0]; break; + case ArrayType::UInt32: value = tosend.castTo()[0]; break; + case ArrayType::UInt64: value = tosend.castTo()[0]; break; + case ArrayType::Float32: value = tosend.castTo()[0]; break; + case ArrayType::Float64: value = tosend.castTo()[0]; break; + case ArrayType::String: value = tosend.castTo()[0]; break; + case ArrayType::Bool: + case ArrayType::Null: + case ArrayType::Value: + std::ostringstream buffer; + buffer << tosend.original_type(); + log_exc_printf(_logger, "Unsupported type %s\n", buffer.str().c_str()); + } + } + } + log_debug_printf(_logger, "%s put built\n", self->key.first.c_str()); + + return top; +} + +void linkPutDone(pvaLinkChannel *self, client::Result&& result) +{ + bool ok = false; + try { + result(); + ok = true; + }catch(std::exception& e){ + errlogPrintf("%s PVA link put ERROR: %s\n", self->key.first.c_str(), e.what()); + // TODO: signal INVALID_ALARM ? + } + + bool needscans; + { + Guard G(self->lock); + + log_debug_printf(_logger, "%s put result %s\n", self->key.first.c_str(), ok ? "OK" : "Not OK"); + + needscans = !self->after_put.empty(); + self->op_put.reset(); + + if(ok) { + // see if we need start a queue'd put + self->put(); + } + } + + log_debug_printf(_logger, "linkPutDone: %s, needscans = %i\n", self->key.first.c_str(), needscans); + + if(needscans) { + linkGlobal->queue.push(self->AP); + } +} + +// call with channel lock held +void pvaLinkChannel::put(bool force) +{ + auto pvReq(linkGlobal->putReq.cloneEmpty() + .update("record._options.block", !after_put.empty())); + + unsigned reqProcess = 0; + bool doit = force; + for(auto& link : links) + { + if(!link->used_scratch) continue; + + link->put_queue = std::move(link->put_scratch); + link->used_scratch = false; + link->used_queue = true; + + doit = true; + + switch(link->proc) { + case pvaLink::NPP: + reqProcess |= 1; + break; + case pvaLink::Default: + break; + case pvaLink::PP: + case pvaLink::CP: + case pvaLink::CPP: + reqProcess |= 2; + break; + } + } + + /* By default, use remote default (passive). + * Request processing, or not, if any link asks. + * Prefer PP over NPP if both are specified. + * + * TODO: per field granularity? + */ + const char *proc = "passive"; + if((reqProcess&2) || force) { + proc = "true"; + } else if(reqProcess&1) { + proc = "false"; + } + pvReq["record._options.process"] = proc; + + log_debug_printf(_logger, "%s Start put %s\n", key.first.c_str(), doit ? "true": "false"); + if(doit) { + // start net Put, cancels in-progress put + op_put = linkGlobal->provider_remote.put(key.first) + .rawRequest(pvReq) + .build([this](Value&& prototype) -> Value + { + return linkBuildPut(this, std::move(prototype)); // TODO + }) + .result([this](client::Result&& result) + { + linkPutDone(this, std::move(result)); + }) + .exec(); + } +} + +void pvaLinkChannel::AfterPut::run() +{ + std::set toscan; + std::shared_ptr link(lc.lock()); + if(!link) + return; + + { + Guard G(link->lock); + toscan.swap(link->after_put); + } + + for(after_put_t::iterator it=toscan.begin(), end=toscan.end(); + it!=end; ++it) + { + dbCommon *prec = *it; + dbScanLock(prec); + log_debug_printf(_logger, "AfterPut start processing %s\n", prec->name); + if(prec->pact) { // complete async. processing + (prec)->rset->process(prec); + + } else { + // maybe the result of "cancellation" or some record support logic error? + errlogPrintf("%s : not PACT when async PVA link completed. Logic error?\n", prec->name); + } + dbScanUnlock(prec); + } + +} + +// caller has locked record +void pvaLinkChannel::ScanTrack::scan() +{ + if(check_passive && prec->scan!=0) { + + } else if (prec->pact) { + if (prec->tpro) + printf("%s: Active %s\n", epicsThreadGetNameSelf(), prec->name); + prec->rpro = TRUE; + + } else { + (void)dbProcess(prec); + } +} + +// Running from global WorkQueue thread +void pvaLinkChannel::run() +{ + { + Guard G(lock); + + log_debug_printf(_logger,"Monitor %s work\n", this->key.first.c_str()); + + Value top; + try { + top = op_mon->pop(); + if(!top) { + log_debug_printf(_logger, "Monitor %s empty\n", this->key.first.c_str()); + return; + } + if(!connected) { + // (re)connect implies type change + log_debug_printf(_logger, "Monitor %s reconnect\n", this->key.first.c_str()); + + root = top; // re-create cache + connected = true; + num_type_change++; + + for(auto link : links) { + link->onTypeChange(); + } + + } else { // update cache + root.assign(top); + } + log_debug_printf(_logupdate, "Monitor %s value %s\n", this->key.first.c_str(), + std::string(SB()<key.first.c_str()); + + connected = false; + + num_disconnect++; + + // cancel pending put operations + op_put.reset(); + + for(auto link : links) { + link->onDisconnect(); + link->snap_time = e.time; + } + + // Don't clear previous_root on disconnect. + // while disconnected, we will provide the most recent value w/ LINK_ALARM + + } catch(std::exception& e) { + log_exc_printf(_logger, "pvalinkChannel::run: Unexpected exception: %s\n", e.what()); + } + + if(links_changed) { + // a link has been added or removed since the last update. + // rebuild our cached list of records to (maybe) process. + + decltype(atomic_records) atomic, nonatomic; + std::vector atomicrecs; + + for(auto link : links) { + assert(link && link->alive); + + auto sou(link->scanOnUpdate()); + if(sou==pvaLink::scanOnUpdateNo) + continue; + + bool check_passive = sou==pvaLink::scanOnUpdatePassive; + + if(link->atomic) { + atomicrecs.push_back(link->plink->precord); + atomic.emplace_back(link->plink->precord, check_passive); + } else { + nonatomic.emplace_back(link->plink->precord, check_passive); + } + } + + log_debug_printf(_logger, "Links changed, %zu with %zu atomic, %zu nonatomic\n", + links.size(), atomic.size(), nonatomic.size()); + + atomic_lock = ioc::DBManyLock(atomicrecs); + atomic_records = std::move(atomic); + nonatomic_records = std::move(nonatomic); + + links_changed = false; + } + + update_seq++; + update_evt.signal(); + log_debug_printf(_logger, "%s Sequence point %u\n", key.first.c_str(), update_seq); + } + // unlock link + + if(!atomic_records.empty()) { + ioc::DBManyLocker L(atomic_lock); + for(auto& trac : atomic_records) { + trac.scan(); + } + } + + for(auto& trac : nonatomic_records) { + ioc::DBLocker L(trac.prec); + trac.scan(); + } + + log_debug_printf(_logger, "Requeueing %s\n", key.first.c_str()); + // re-queue until monitor queue is empty + linkGlobal->queue.push(shared_from_this()); +} + +}} // namespace pvxs::ioc diff --git a/ioc/pvalink_jlif.cpp b/ioc/pvalink_jlif.cpp new file mode 100644 index 000000000..49b17a372 --- /dev/null +++ b/ioc/pvalink_jlif.cpp @@ -0,0 +1,307 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include "pvalink.h" + +#include // redirects stdout/stderr +#include + +namespace pvxs { +namespace ioc { +pvaLinkConfig::~pvaLinkConfig() {} + +namespace { + +/* link options. + * + * "pvname" # short-hand, sets PV name only + * + * { + * "pv":"name", + * "field":"blah.foo", + * "Q":5, + * "pipeline":false, + * "proc":true, // false, true, none, "", "NPP", "PP", "CP", "CPP" + * "sevr":true, // false, true, "NMS", "MS", "MSI", "MSS" + * "time":true, // false, true + * "monorder":#,// order of processing during CP scan + * "defer":true,// whether to immediately start Put, or only queue value to be sent + * "retry":true,// queue Put while disconnected, and retry on connect + * "always":true,// CP/CPP updates always process a like, even if its input field hasn't changed + * "local":false,// Require local channel + * } + */ + +jlink* pva_alloc_jlink(short) +{ + try { + return new pvaLink; + + }catch(std::exception& e){ + errlogPrintf("Error allocating pva link: %s\n", e.what()); + return NULL; + } +} + +#define TRY pvaLinkConfig *pvt = static_cast(pjlink); (void)pvt; try +#define CATCH(RET) catch(std::exception& e){ \ + errlogPrintf("Error in %s link: %s\n", __FUNCTION__, e.what()); \ + return RET; } + +void pva_free_jlink(jlink *pjlink) +{ + TRY { + delete pvt; + }catch(std::exception& e){ + errlogPrintf("Error freeing pva link: %s\n", e.what()); + } +} + +jlif_result pva_parse_null(jlink *pjlink) +{ + TRY { + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "proc") { + pvt->proc = pvaLinkConfig::Default; + } else if(pvt->jkey == "sevr") { + pvt->sevr = pvaLinkConfig::NMS; + } else if(pvt->jkey == "local") { + pvt->local = false; // alias for local:false + } else if(pvt->debug) { + printf("pva link parsing unknown none depth=%u key=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str()); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_bool(jlink *pjlink, int val) +{ + TRY { +// TRACE(<jkey<<" "<<(val?"true":"false")); + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "proc") { + pvt->proc = val ? pvaLinkConfig::PP : pvaLinkConfig::NPP; + } else if(pvt->jkey == "sevr") { + pvt->sevr = val ? pvaLinkConfig::MS : pvaLinkConfig::NMS; + } else if(pvt->jkey == "defer") { + pvt->defer = !!val; + } else if(pvt->jkey == "pipeline") { + pvt->pipeline = !!val; + } else if(pvt->jkey == "time") { + pvt->time = !!val; + } else if(pvt->jkey == "retry") { + pvt->retry = !!val; + } else if(pvt->jkey == "local") { + pvt->local = !!val; + } else if(pvt->jkey == "always") { + pvt->always = !!val; + } else if(pvt->jkey == "atomic") { + pvt->atomic = !!val; + } else if(pvt->debug) { + printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%s\n", + pvt->parseDepth, pvt->jkey.c_str(), val ? "true" : "false"); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_integer(jlink *pjlink, long long val) +{ + TRY { + if(pvt->parseDepth!=1) { + // ignore + } else if(pvt->jkey == "Q") { + pvt->queueSize = val < 1 ? 1 : size_t(val); + } else if(pvt->jkey == "monorder") { + pvt->monorder = std::max(-1024, std::min(int(val), 1024)); + } else if(pvt->debug) { + printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%lld\n", + pvt->parseDepth, pvt->jkey.c_str(), val); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_string(jlink *pjlink, const char *val, size_t len) +{ + TRY{ + std::string sval(val, len); + if(pvt->parseDepth==0 || (pvt->parseDepth==1 && pvt->jkey=="pv")) { + pvt->channelName = sval; + + } else if(pvt->parseDepth > 1) { + // ignore + + } else if(pvt->jkey=="field") { + pvt->fieldName = sval; + + } else if(pvt->jkey=="proc") { + if(sval.empty()) { + pvt->proc = pvaLinkConfig::Default; + } else if(sval=="CP") { + pvt->proc = pvaLinkConfig::CP; + } else if(sval=="CPP") { + pvt->proc = pvaLinkConfig::CPP; + } else if(sval=="PP") { + pvt->proc = pvaLinkConfig::PP; + } else if(sval=="NPP") { + pvt->proc = pvaLinkConfig::NPP; + } else if(pvt->debug) { + printf("pva link parsing unknown proc depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + } else if(pvt->jkey=="sevr") { + if(sval=="NMS") { + pvt->sevr = pvaLinkConfig::NMS; + } else if(sval=="MS") { + pvt->sevr = pvaLinkConfig::MS; + } else if(sval=="MSI") { + pvt->sevr = pvaLinkConfig::MSI; + } else if(sval=="MSS") { + // not sure how to handle mapping severity for MSS. + // leave room for this to happen compatibly later by + // handling as alias for MS until then. + pvt->sevr = pvaLinkConfig::MS; + } else if(pvt->debug) { + printf("pva link parsing unknown sevr depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + } else if(pvt->debug) { + printf("pva link parsing unknown string depth=%u key=\"%s\" value=\"%s\"\n", + pvt->parseDepth, pvt->jkey.c_str(), sval.c_str()); + } + + pvt->jkey.clear(); + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_key_result pva_parse_start_map(jlink *pjlink) +{ + TRY { + return jlif_key_continue; + }CATCH(jlif_key_stop) +} + +jlif_result pva_parse_key_map(jlink *pjlink, const char *key, size_t len) +{ + TRY { + std::string sval(key, len); + pvt->jkey = sval; + + return jlif_continue; + }CATCH(jlif_stop) +} + +jlif_result pva_parse_end_map(jlink *pjlink) +{ + TRY { + return jlif_continue; + }CATCH(jlif_stop) +} + +struct lset* pva_get_lset(const jlink *pjlink) +{ + return &pva_lset; +} + +void pva_report(const jlink *rpjlink, int lvl, int indent) +{ + const pvaLink *pval = static_cast(rpjlink); + try { + (void)pval; + printf("%*s'pva': %s", indent, "", pval->channelName.c_str()); + if(!pval->fieldName.empty()) + printf("|.%s", pval->fieldName.c_str()); + + switch(pval->proc) { + case pvaLinkConfig::NPP: printf(" NPP"); break; + case pvaLinkConfig::Default: printf(" Def"); break; + case pvaLinkConfig::PP: printf(" PP"); break; + case pvaLinkConfig::CP: printf(" CP"); break; + case pvaLinkConfig::CPP: printf(" CPP"); break; + } + switch(pval->sevr) { + case pvaLinkConfig::NMS: printf(" NMS"); break; + case pvaLinkConfig::MS: printf(" MS"); break; + case pvaLinkConfig::MSI: printf(" MSI"); break; + } + if(lvl>0) { + printf(" Q=%u pipe=%c defer=%c time=%c retry=%c atomic=%c morder=%d", + unsigned(pval->queueSize), + pval->pipeline ? 'T' : 'F', + pval->defer ? 'T' : 'F', + pval->time ? 'T' : 'F', + pval->retry ? 'T' : 'F', + pval->atomic ? 'T' : 'F', + pval->monorder); + } + + if(pval->lchan) { + // after open() + Guard G(pval->lchan->lock); + + printf(" conn=%c", pval->lchan->connected ? 'T' : 'F'); + if(pval->lchan->op_put) { + printf(" Put"); + } + + if(lvl>0) { + printf(" #disconn=%zu", pval->lchan->num_disconnect); + } +// if(lvl>5) { +// std::ostringstream strm; +// pval->lchan->chan.show(strm); +// printf("\n%*s CH: %s", indent, "", strm.str().c_str()); +// } + } else { + printf(" No Channel"); + } + printf("\n"); + }CATCH() +} + +} //namespace + +jlif lsetPVA = { + "pva", + &pva_alloc_jlink, + &pva_free_jlink, + &pva_parse_null, + &pva_parse_bool, + &pva_parse_integer, + NULL, + &pva_parse_string, + &pva_parse_start_map, + &pva_parse_key_map, + &pva_parse_end_map, + NULL, + NULL, + NULL, + &pva_get_lset, + &pva_report, + NULL +}; + +}} //namespace pvxs::ioc + +extern "C" { +using pvxs::ioc::lsetPVA; +epicsExportAddress(jlif, lsetPVA); +} diff --git a/ioc/pvalink_link.cpp b/ioc/pvalink_link.cpp new file mode 100644 index 000000000..388a8c0f5 --- /dev/null +++ b/ioc/pvalink_link.cpp @@ -0,0 +1,135 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include + +#include "pvalink.h" + +DEFINE_LOGGER(_logger, "pvxs.ioc.link.link"); + +namespace pvxs { +namespace ioc { + +pvaLink::pvaLink() +{ + //TODO: valgrind tells me these aren't initialized by Base, but probably should be. + parseDepth = 0; + parent = 0; +} + +pvaLink::~pvaLink() +{ + alive = false; + + if(lchan) { // may be NULL if parsing fails + Guard G(lchan->lock); + + lchan->links.erase(this); + lchan->links_changed = true; + + bool new_debug = false; + for(auto pval : lchan->links) { + if(pval->debug) { + new_debug = true; + break; + } + } + + lchan->debug = new_debug; + } +} + +Value pvaLink::makeRequest() +{ + // TODO: cache TypeDef in global + using namespace pvxs::members; + return TypeDef(TypeCode::Struct, { + Struct("field", {}), + Struct("record", { + Struct("_options", { + Bool("pipeline"), + Bool("atomic"), + UInt32("queueSize"), + }), + }), + }).create() + .update("record._options.pipeline", pipeline) + .update("record._options.atomic", true) + .update("record._options.queueSize", uint32_t(queueSize)); +} + +// caller must lock lchan->lock +bool pvaLink::valid() const +{ + return lchan->connected && lchan->root; +} + +// call with channel lock held +void pvaLink::onDisconnect() +{ + log_debug_printf(_logger, "%s disconnect\n", plink->precord->name); + // TODO: option to remain queue'd while disconnected + + used_queue = used_scratch = false; +} + +void pvaLink::onTypeChange() +{ + assert(lchan->connected && lchan->root); // we should only be called when connected + + fld_value = fld_severity = fld_nanoseconds = fld_usertag + = fld_message = fld_severity = fld_meta = Value(); + + Value root; + if(fieldName.empty()) { + root = lchan->root; + } else { + root = lchan->root[fieldName]; + } + if(!root) { + log_warn_printf(_logger, "%s has no %s\n", lchan->key.first.c_str(), fieldName.c_str()); + + } else if(root.type()!=TypeCode::Struct) { + log_debug_printf(_logger, "%s has no meta\n", lchan->key.first.c_str()); + fld_value = root; + + } else { + fld_value = root["value"]; + fld_seconds = root["timeStamp.secondsPastEpoch"]; + fld_nanoseconds = root["timeStamp.nanoseconds"]; + fld_usertag = root["timeStamp.userTag"]; + fld_severity = root["alarm.severity"]; + fld_message = root["alarm.message"]; + fld_meta = std::move(root); + } + + log_debug_printf(_logger, "%s type change V=%c S=%c N=%c S=%c M=%c\n", + plink->precord->name, + fld_value ? 'Y' : 'N', + fld_seconds ? 'Y' : 'N', + fld_nanoseconds ? 'Y' : 'N', + fld_severity ? 'Y' : 'N', + fld_meta ? 'Y' : 'N'); +} + +pvaLink::scanOnUpdate_t pvaLink::scanOnUpdate() const +{ + if(!plink) + return scanOnUpdateNo; + if(type!=DBF_INLINK) + return scanOnUpdateNo; + if(proc == pvaLink::CP) + return scanOnUpdateYes; + if(proc == pvaLink::CPP) + return scanOnUpdatePassive; + return scanOnUpdateNo; +} + +}} // namespace pvxs::ioc diff --git a/ioc/pvalink_lset.cpp b/ioc/pvalink_lset.cpp new file mode 100644 index 000000000..0fd46e57a --- /dev/null +++ b/ioc/pvalink_lset.cpp @@ -0,0 +1,707 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include + +#include +#include "dbentry.h" +#include "pvalink.h" +#include "utilpvt.h" + +#include // redirect stdout/stderr; include after libevent/util.h + +DEFINE_LOGGER(_logger, "pvxs.ioc.link.lset"); + +namespace pvxs { +namespace ioc { +namespace { + +#define TRY pvaLink *self = static_cast(plink->value.json.jlink); assert(self->alive); try +#define CATCH() catch(std::exception& e) { \ + errlogPrintf("pvaLink %s fails %s: %s\n", __func__, plink->precord->name, e.what()); \ +} + +#define CHECK_VALID() if(!self->valid()) { log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); return -1;} + +dbfType getLinkType(DBLINK *plink) +{ + ioc::DBEntry ent(plink->precord); + + for(long status = dbFirstField(ent, 0); !status; status = dbNextField(ent, 0)) { + if(ent->pfield==plink) + return ent->pflddes->field_type; + } + throw std::logic_error("DBLINK* corrupt"); +} + +void pvaOpenLink(DBLINK *plink) +{ + try { + pvaLink* self((pvaLink*)plink->value.json.jlink); + self->type = getLinkType(plink); + + if(self->local && dbChannelTest(self->channelName.c_str())!=0) { + // TODO: only print duing iocInit()? + fprintf(stderr, "%s Error: local:true link to '%s' can't be fulfilled\n", + plink->precord->name, self->channelName.c_str()); + plink->lset = NULL; + return; + } + + // workaround for Base not propagating info(base:lsetDebug to us + { + ioc::DBEntry rec(plink->precord); + + if(epicsStrCaseCmp(rec.info("base:lsetDebug", "NO"), "YES")==0) { + self->debug = 1; + } + } + + log_debug_printf(_logger, "%s OPEN %s sevr=%d\n", + plink->precord->name, self->channelName.c_str(), + self->sevr); + + // still single threaded at this point. + // also, no pvaLinkChannel::lock yet + + self->plink = plink; + + if(self->channelName.empty()) + return; // nothing to do... + + auto pvRequest(self->makeRequest()); + linkGlobal_t::channels_key_t key = std::make_pair(self->channelName, std::string(SB()< chan; + bool doOpen = false; + { + Guard G(linkGlobal->lock); + + linkGlobal_t::channels_t::iterator it(linkGlobal->channels.find(key)); + + if(it!=linkGlobal->channels.end()) { + // re-use existing channel + chan = it->second.lock(); + } + + if(!chan) { + // open new channel + + log_debug_printf(_logger, "%s CREATE %s\n", + plink->precord->name, self->channelName.c_str()); + + chan.reset(new pvaLinkChannel(key, pvRequest)); + chan->AP->lc = chan; + linkGlobal->channels.insert(std::make_pair(key, chan)); + doOpen = true; + + } else { + log_debug_printf(_logger, "%s REUSE %s\n", + plink->precord->name, self->channelName.c_str()); + } + + doOpen &= linkGlobal->running; // if not running, then open from initHook + } + + if(doOpen) { + chan->open(); // start subscription + } + + bool scanInit = false; + { + Guard G(chan->lock); + + chan->links.insert(self); + chan->links_changed = true; + + self->lchan = std::move(chan); // we are now attached + + self->lchan->debug |= !!self->debug; + + if(self->lchan->connected) { + self->onTypeChange(); + auto sou(self->scanOnUpdate()); + switch(sou) { + case pvaLink::scanOnUpdateNo: + break; + case pvaLink::scanOnUpdatePassive: + // record is locked + scanInit = plink->precord->scan==menuScanPassive; + break; + case pvaLink::scanOnUpdateYes: + scanInit = true; + break; + } + } + } + if(scanInit) { + // TODO: initial scan on linkGlobal worker? + scanOnce(plink->precord); + } + + return; + }CATCH() + // on error, prevent any further calls to our lset functions + plink->lset = NULL; +} + +void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink) +{ + (void)locker; + try { + std::unique_ptr self((pvaLink*)plink->value.json.jlink); + log_debug_printf(_logger, "%s: %s %s\n", __func__, plink->precord->name, self->channelName.c_str()); + assert(self->alive); + + }CATCH() +} + +int pvaIsConnected(const DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + + bool ret = self->valid(); + log_debug_printf(_logger, "%s: %s %s\n", __func__, plink->precord->name, self->channelName.c_str()); + return ret; + + }CATCH() + return 0; +} + +int pvaGetDBFtype(const DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + // if fieldName is empty, use top struct value + // if fieldName not empty + // if sub-field is struct, use sub-struct .value + // if sub-field not struct, treat as value + + auto& value(self->fld_value); + auto vtype(self->fld_value.type()); + if(vtype.isarray()) + vtype = vtype.scalarOf(); + + switch(value.type().code) { + case TypeCode::Int8: return DBF_CHAR; + case TypeCode::Int16: return DBF_SHORT; + case TypeCode::Int32: return DBF_LONG; + case TypeCode::Int64: return DBF_INT64; + case TypeCode::UInt8: return DBF_UCHAR; + case TypeCode::UInt16: return DBF_USHORT; + case TypeCode::UInt32: return DBF_ULONG; + case TypeCode::UInt64: return DBF_UINT64; + case TypeCode::Float32: return DBF_FLOAT; + case TypeCode::Float64: return DBF_DOUBLE; + case TypeCode::String: return DBF_STRING; + case TypeCode::Struct: { + if(value.id()=="enum_t" + && value["index"].type().kind()==Kind::Integer + && value["choices"].type()==TypeCode::StringA) + return DBF_ENUM; + } + // fall through + default: + return DBF_LONG; // default for un-mapable types. + } + + }CATCH() + return -1; +} + +long pvaGetElements(const DBLINK *plink, long *nelements) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + shared_array arr; + if(!self->fld_value.type().isarray()) { + *nelements = 1; + } else if(self->fld_value.as(arr)) { + *nelements = arr.size(); + } + return 0; + }CATCH() + return -1; +} + +long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, + long *pnRequest) +{ + TRY { + Guard G(self->lchan->lock); + + if(!self->valid()) { + // disconnected + (void)recGblSetSevr(plink->precord, LINK_ALARM, INVALID_ALARM); + if(self->time) { + plink->precord->time = self->snap_time; + } + log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); + return -1; + } + + auto nReq(pnRequest ? *pnRequest : 1); + auto value(self->fld_value); + + if(value.type()==TypeCode::Any) + value = value.lookup("->"); + + if(nReq <= 0 || !value) { + if(!pnRequest) { + memset(pbuffer, 0, dbValueSize(dbrType)); + nReq = 1; + } + + } else if(value.type().isarray()) { + auto arr(value.as>()); + + if(size_t(nReq) > arr.size()) + nReq = arr.size(); + + if(dbrType==DBR_STRING) { + auto sarr(arr.castTo()); // may copy+convert + + auto cbuf(reinterpret_cast(pbuffer)); + for(size_t i : range(size_t(nReq))) { + strncpy(cbuf + i*MAX_STRING_SIZE, + sarr[i].c_str(), + MAX_STRING_SIZE-1u); + cbuf[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0'; + } + + } else { + ArrayType dtype; + switch(dbrType) { + case DBR_CHAR: dtype = ArrayType::Int8; break; + case DBR_SHORT: dtype = ArrayType::Int16; break; + case DBR_LONG: dtype = ArrayType::Int32; break; + case DBR_INT64: dtype = ArrayType::Int64; break; + case DBR_UCHAR: dtype = ArrayType::UInt8; break; + case DBR_USHORT: dtype = ArrayType::UInt16; break; + case DBR_ULONG: dtype = ArrayType::UInt32; break; + case DBR_UINT64: dtype = ArrayType::UInt64; break; + case DBR_FLOAT: dtype = ArrayType::Float32; break; + case DBR_DOUBLE: dtype = ArrayType::Float64; break; + default: + log_debug_printf(_logger, "%s: %s unsupported array conversion\n", + __func__, plink->precord->name); + return S_db_badDbrtype; + } + + detail::convertArr(dtype, pbuffer, + arr.original_type(), arr.data(), + size_t(nReq)); + } + + } else { // scalar + // TODO: special case for "long string" + + if(value.type()==TypeCode::Struct && self->fld_value.id()=="enum_t") { // NTEnum + auto index(value["index"].as()); + switch(dbrType) { + case DBR_CHAR: *reinterpret_cast(pbuffer) = index; break; + case DBR_SHORT: *reinterpret_cast(pbuffer) = index; break; + case DBR_LONG: *reinterpret_cast(pbuffer) = index; break; + case DBR_INT64: *reinterpret_cast(pbuffer) = index; break; + case DBR_UCHAR: *reinterpret_cast(pbuffer) = index; break; + case DBR_USHORT: *reinterpret_cast(pbuffer) = index; break; + case DBR_ULONG: *reinterpret_cast(pbuffer) = index; break; + case DBR_UINT64: *reinterpret_cast(pbuffer) = index; break; + case DBR_FLOAT: *reinterpret_cast(pbuffer) = index; break; + case DBR_DOUBLE: *reinterpret_cast(pbuffer) = index; break; + case DBR_STRING: { + auto cbuf(reinterpret_cast(pbuffer)); + auto choices(value["choices"].as>()); + if(index>=0 && size_t(index) < choices.size()) { + auto& choice(choices[index]); + strncpy(cbuf, choice.c_str(), MAX_STRING_SIZE-1u); + + } else { + epicsSnprintf(cbuf, MAX_STRING_SIZE-1u, "%u", unsigned(index)); + } + cbuf[MAX_STRING_SIZE-1u] = '\0'; + break; + } + default: + log_debug_printf(_logger, "%s: %s unsupported enum conversion\n", + __func__, plink->precord->name); + return S_db_badDbrtype; + } + + } else { // plain scalar + switch(dbrType) { + case DBR_CHAR: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_SHORT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_LONG: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_INT64: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_UCHAR: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_USHORT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_ULONG: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_UINT64: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_FLOAT: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_DOUBLE: *reinterpret_cast(pbuffer) = value.as(); break; + case DBR_STRING: { + auto cbuf(reinterpret_cast(pbuffer)); + auto sval(value.as()); + strncpy(cbuf, sval.c_str(), MAX_STRING_SIZE-1u); + cbuf[MAX_STRING_SIZE-1u] = '\0'; + break; + } + default: + log_debug_printf(_logger, "%s: %s unsupported scalar conversion\n", + __func__, plink->precord->name); + return S_db_badDbrtype; + } + } + nReq = 1; + } + + if(pnRequest) + *pnRequest = nReq; + + if(self->fld_seconds) { + self->snap_time.secPastEpoch = self->fld_seconds.as() - POSIX_TIME_AT_EPICS_EPOCH; + if(self->fld_nanoseconds) { + self->snap_time.nsec = self->fld_nanoseconds.as(); + } else { + self->snap_time.nsec = 0u; + } + } else { + self->snap_time.secPastEpoch = 0u; + self->snap_time.nsec = 0u; + } + + if(self->fld_severity) { + self->snap_severity = self->fld_severity.as(); + } else { + self->snap_severity = NO_ALARM; + } + + if(self->fld_message && self->snap_severity!=0) { + self->snap_message = self->fld_message.as(); + } else { + self->snap_message.clear(); + } + + if((self->snap_severity!=NO_ALARM && self->sevr == pvaLink::MS) || + (self->snap_severity==INVALID_ALARM && self->sevr == pvaLink::MSI)) + { + log_debug_printf(_logger, "%s: %s recGblSetSevr %d\n", __func__, plink->precord->name, + self->snap_severity); + recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity); + } + + if(self->time) { + plink->precord->time = self->snap_time; + } + + log_debug_printf(_logger, "%s: %s %s snapsevr=%d OK\n", __func__, plink->precord->name, + self->channelName.c_str(), self->snap_severity); + return 0; + }CATCH() + return -1; +} + +long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(lo) + (void)self->fld_meta["control.limitLow"].as(*lo); + if(hi) + (void)self->fld_meta["control.limitHigh"].as(*hi); + + log_debug_printf(_logger, "%s: %s %s %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(lo) + (void)self->fld_meta["display.limitLow"].as(*lo); + if(hi) + (void)self->fld_meta["display.limitHigh"].as(*hi); + + log_debug_printf(_logger, "%s: %s %s %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo, + double *hi, double *hihi) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(lolo) + (void)self->fld_meta["valueAlarm.lowAlarmLimit"].as(*lolo); + if(lo) + (void)self->fld_meta["valueAlarm.lowWarningLimit"].as(*lo); + if(hi) + (void)self->fld_meta["valueAlarm.highWarningLimit"].as(*hi); + if(hihi) + (void)self->fld_meta["valueAlarm.highAlarmLimit"].as(*hihi); + + + log_debug_printf(_logger, "%s: %s %s %f %f %f %f\n", + __func__, plink->precord->name, self->channelName.c_str(), + lo ? *lo : 0, lolo ? *lolo : 0, hi ? *hi : 0, hihi ? *hihi : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetPrecision(const DBLINK *plink, short *precision) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + uint16_t prec = 0; + (void)self->fld_meta["display.precision"].as(prec); + if(precision) + *precision = prec; + + log_debug_printf(_logger, "%s: %s %s %i\n", __func__, plink->precord->name, self->channelName.c_str(), prec); + return 0; + }CATCH() + return -1; +} + +long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(!units || unitsSize==0) return 0; + + + std::string egu; + (void)self->fld_meta["display.units"].as(egu); + strncpy(units, egu.c_str(), unitsSize-1); + units[unitsSize-1] = '\0'; + + log_debug_printf(_logger, "%s: %s %s %s\n", __func__, plink->precord->name, self->channelName.c_str(), units); + return 0; + }CATCH() + return -1; +} + +long pvaGetAlarmMsg(const DBLINK *plink, + epicsEnum16 *status, epicsEnum16 *severity, + char *msgbuf, size_t msgbuflen) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(severity) { + *severity = self->snap_severity; + } + if(status) { + *status = self->snap_severity ? LINK_ALARM : NO_ALARM; + } + if(msgbuf && msgbuflen) { + if(self->snap_message.empty()) { + msgbuf[0] = '\0'; + } else { + epicsSnprintf(msgbuf, msgbuflen-1u, "%s", self->snap_message.c_str()); + msgbuf[msgbuflen-1u] = '\0'; + } + } + log_debug_printf(_logger, "%s: %s %s %i %i\n", + __func__, plink->precord->name, self->channelName.c_str(), severity ? *severity : 0, status ? *status : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, + epicsEnum16 *severity) +{ + return pvaGetAlarmMsg(plink, status, severity, nullptr, 0); +} + +long pvaGetTimeStampTag(const DBLINK *plink, epicsTimeStamp *pstamp, epicsUTag *ptag) +{ + TRY { + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(pstamp) { + *pstamp = self->snap_time; + } + if(ptag) { + *ptag = self->snap_tag; + } + log_debug_printf(_logger, "%s: %s %s %i %i\n", __func__, plink->precord->name, self->channelName.c_str(), pstamp ? pstamp->secPastEpoch : 0, pstamp ? pstamp->nsec : 0); + return 0; + }CATCH() + return -1; +} + +long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp) +{ + return pvaGetTimeStampTag(plink, pstamp, nullptr); +} + +long pvaPutValueX(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest, bool wait) +{ + TRY { + (void)self; + Guard G(self->lchan->lock); + + if(nRequest < 0) return -1; + + if(!self->retry && !self->valid()) { + log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); + return -1; + } + + shared_array buf; + + if(dbrType == DBF_STRING) { + const char *sbuffer = (const char*)pbuffer; + shared_array sval(nRequest); + + for(long n=0; nput_scratch = sval.freeze().castTo(); + + } else { + ArrayType dtype; + switch(dbrType) { + case DBR_CHAR: dtype = ArrayType::Int8; break; + case DBR_SHORT: dtype = ArrayType::Int16; break; + case DBR_LONG: dtype = ArrayType::Int32; break; + case DBR_INT64: dtype = ArrayType::Int64; break; + case DBR_UCHAR: dtype = ArrayType::UInt8; break; + case DBR_USHORT: dtype = ArrayType::UInt16; break; + case DBR_ULONG: dtype = ArrayType::UInt32; break; + case DBR_UINT64: dtype = ArrayType::UInt64; break; + case DBR_FLOAT: dtype = ArrayType::Float32; break; + case DBR_DOUBLE: dtype = ArrayType::Float64; break; + default: + return S_db_badDbrtype; + } + + auto val(detail::copyAs(dtype, dtype, pbuffer, size_t(nRequest))); + + self->put_scratch = val.freeze().castTo(); + } + + self->used_scratch = true; + + if(wait) + self->lchan->after_put.insert(plink->precord); + + if(!self->defer) self->lchan->put(); + + log_debug_printf(_logger, "%s: %s %s %s\n", __func__, plink->precord->name, self->channelName.c_str(), self->lchan->root.valid() ? "valid": "not valid"); + + return 0; + }CATCH() + return -1; +} + +long pvaPutValue(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, false); +} + +long pvaPutValueAsync(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, true); +} + +void pvaScanForward(DBLINK *plink) +{ + TRY { + Guard G(self->lchan->lock); + + if(!self->retry && !self->valid()) { + (void)recGblSetSevrMsg(plink->precord, LINK_ALARM, INVALID_ALARM, "Disconn"); + return; + } + + // FWD_LINK is never deferred, and always results in a Put + self->lchan->put(true); + + log_debug_printf(_logger, "%s: %s %s %s\n", + __func__, plink->precord->name, self->channelName.c_str(), self->lchan->root.valid() ? "valid": "not valid"); + }CATCH() +} + +#if EPICS_VERSION_INT>=VERSION_INT(3,16,1,0) +long pvaDoLocked(struct link *plink, dbLinkUserCallback rtn, void *priv) +{ + TRY { + Guard G(self->lchan->lock); + return (*rtn)(plink, priv); + }CATCH() + return 1; +} +#endif // >= 3.16.1 + +#undef TRY +#undef CATCH + +} //namespace + +lset pva_lset = { + 0, 1, // non-const, volatile + &pvaOpenLink, + &pvaRemoveLink, + NULL, NULL, NULL, + &pvaIsConnected, + &pvaGetDBFtype, + &pvaGetElements, + &pvaGetValue, + &pvaGetControlLimits, + &pvaGetGraphicLimits, + &pvaGetAlarmLimits, + &pvaGetPrecision, + &pvaGetUnits, + &pvaGetAlarm, + &pvaGetTimeStamp, + &pvaPutValue, + &pvaPutValueAsync, + &pvaScanForward, +#if EPICS_VERSION_INT>=VERSION_INT(3,16,1,0) + &pvaDoLocked, +#endif +#if EPICS_VERSION_INT>=VERSION_INT(7,0,6,0) + &pvaGetAlarmMsg, + &pvaGetTimeStampTag, +#endif +}; + +}} // namespace pvxs::ioc diff --git a/ioc/pvxs/iochooks.h b/ioc/pvxs/iochooks.h index ca090dbd3..29d5f2642 100644 --- a/ioc/pvxs/iochooks.h +++ b/ioc/pvxs/iochooks.h @@ -100,5 +100,71 @@ void testPrepare(); PVXS_IOC_API void testShutdown(); +/** Call just after testIocShutdownOk() + * @since UNRELEASED + */ +PVXS_IOC_API +void testAfterShutdown(); + +/** Call just before testdbCleanup() + * @since UNRELEASED + */ +PVXS_IOC_API +void testCleanupPrepare(); + +#if EPICS_VERSION_INT >= VERSION_INT(3, 15, 0 ,0) + +/** Manage Test IOC life-cycle calls. + * + * Makes necessary calls to dbUnitTest.h API + * as well as any added calls needed by PVXS components. + * + @code + * MAIN(mytest) { + * testPlan(0); + * pvxs::testSetup(); + * pvxs::logger_config_env(); // (optional) + * { + * TestIOC ioc; // testdbPrepare() + * + * // mytestioc.dbd must include pvxsIoc.dbd + * testdbReadDatabase("mytestioc.dbd", NULL, NULL); + * mytestioc_registerRecordDeviceDriver(pdbbase); + * testdbReadDatabase("sometest.db", NULL, NULL); + * + * // tests before iocInit() + * + * ioc.init(); + * + * // tests after iocInit() + * + * ioc.shutdown(); // (optional) in ~TestIOC if omitted + * } + * { + * ... repeat ... + * } + * epicsExitCallAtExits(); + * cleanup_for_valgrind(); + * } + @endcode + * + * @since UNRELEASED + */ +class PVXS_IOC_API TestIOC final { + bool isRunning = false; +public: + TestIOC(); + ~TestIOC(); + //! iocInit() + void init(); + //! iocShutdown() + void shutdown(); + //! between iocInit() and iocShutdown() ? + inline + bool running() const { return isRunning; } +}; + +#endif // base >= 3.15 + }} // namespace pvxs::ioc #endif // PVXS_IOCHOOKS_H diff --git a/ioc/pvxsIoc.dbd b/ioc/pvxs3x.dbd similarity index 74% rename from ioc/pvxsIoc.dbd rename to ioc/pvxs3x.dbd index 6330d4a3e..b5273d354 100644 --- a/ioc/pvxsIoc.dbd +++ b/ioc/pvxs3x.dbd @@ -1,6 +1,4 @@ registrar(pvxsBaseRegistrar) -registrar(pvxsSingleSourceRegistrar) -registrar(pvxsGroupSourceRegistrar) # from demo.cpp device(waveform, CONSTANT, devWfPDBQ2Demo, "QSRV2 Demo") diff --git a/ioc/pvxs7x.dbd b/ioc/pvxs7x.dbd new file mode 100644 index 000000000..aa4957652 --- /dev/null +++ b/ioc/pvxs7x.dbd @@ -0,0 +1,8 @@ +registrar(pvxsBaseRegistrar) +link("pva", "lsetPVA") + +# from demo.cpp +device(waveform, CONSTANT, devWfPDBQ2Demo, "QSRV2 Demo") +device(longin, CONSTANT, devLoPDBQ2UTag, "QSRV2 Set UTag") +# from imagedemo.c +function(QSRV2_image_demo) diff --git a/ioc/qsrvpvt.h b/ioc/qsrvpvt.h new file mode 100644 index 000000000..feaab0403 --- /dev/null +++ b/ioc/qsrvpvt.h @@ -0,0 +1,67 @@ +/* Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +#ifndef QSRVPVT_H +#define QSRVPVT_H + +#include +#include + +namespace pvxs { +namespace ioc { + +#if EPICS_VERSION_INT >= VERSION_INT(3, 15, 0 ,0) +# define USE_QSRV_SINGLE +void single_enable(); +void dbRegisterQSRV2(); +void addSingleSrc(); +#else +static inline void single_enable() {} +static inline void dbRegisterQSRV2() {} +static inline void addSingleSrc() {} +#endif + +#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 0 ,0) +# define USE_PVA_LINKS +void group_enable(); +void pvalink_enable(); +void processGroups(); +void addGroupSrc(); +void resetGroups(); +#else +static inline void group_enable() {} +static inline void pvalink_enable() {} +static inline void processGroups() {} +static inline void addGroupSrc() {} +static inline void resetGroups() {} +#endif + +#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 4, 0) +# define USE_DEINIT_HOOKS +#endif +#if EPICS_VERSION_INT > VERSION_INT(7, 0, 7, 0) +# define USE_PREPARE_CLEANUP_HOOKS +#endif + +#ifdef USE_PVA_LINKS +// test utilities for PVA links + +PVXS_IOC_API +void testqsrvWaitForLinkConnected(struct link *plink, bool conn=true); +PVXS_IOC_API +void testqsrvWaitForLinkConnected(const char* pv, bool conn=true); + +class PVXS_IOC_API QSrvWaitForLinkUpdate final { + struct link * const plink; + unsigned seq; +public: + QSrvWaitForLinkUpdate(struct link *plink); + QSrvWaitForLinkUpdate(const char* pv); + ~QSrvWaitForLinkUpdate(); +}; +#endif + +}} // namespace pvxs::ioc + +#endif // QSRVPVT_H diff --git a/ioc/singlesource.cpp b/ioc/singlesource.cpp index 66bc0e14f..1fd292cdd 100644 --- a/ioc/singlesource.cpp +++ b/ioc/singlesource.cpp @@ -102,6 +102,33 @@ void onSubscribe(const std::shared_ptr& subscriptio const DBEventContext& eventContext, std::unique_ptr&& subscriptionOperation) { + auto pvReq(subscriptionOperation->pvRequest()); + unsigned dbe = 0; + if(auto fld = pvReq["record._options.DBE"].ifMarked()) { + switch(fld.type().kind()) { + case Kind::String: { + auto mask(fld.as()); + // name and sloppy parsing a la. caProvider... +#define CASE(EVENT) if(mask.find(#EVENT)!=mask.npos) dbe |= DBE_ ## EVENT + CASE(VALUE); + CASE(ARCHIVE); + CASE(ALARM); +// CASE(PROPERTY); // handled as special case +#undef CASE + break; + } + case Kind::Integer: + case Kind::Real: + dbe = fld.as(); + break; + default: + break; + } + } + dbe &= (DBE_VALUE | DBE_ARCHIVE | DBE_ALARM); + if(!dbe) + dbe = DBE_VALUE | DBE_ALARM; + // inform peer of data type and acquire control of the subscription queue subscriptionContext->subscriptionControl = subscriptionOperation->connect(subscriptionContext->currentValue); @@ -115,7 +142,7 @@ void onSubscribe(const std::shared_ptr& subscriptio subscriptionContext->info->chan, subscriptionValueCallback, subscriptionContext.get(), - DBE_VALUE | DBE_ALARM | DBE_ARCHIVE + dbe ); // second subscription is for Property changes subscriptionContext->pPropertiesEventSubscription.subscribe(eventContext.get(), diff --git a/ioc/singlesourcehooks.cpp b/ioc/singlesourcehooks.cpp index e86a278a8..2dd720ddb 100644 --- a/ioc/singlesourcehooks.cpp +++ b/ioc/singlesourcehooks.cpp @@ -21,6 +21,7 @@ #include #include +#include "qsrvpvt.h" #include "iocshcommand.h" #include "singlesource.h" @@ -142,48 +143,29 @@ dbServer qsrv2Server = { qClient, }; -/** - * Initialise qsrv database single records by adding them as sources in our running pvxs server instance - * - * @param theInitHookState the initHook state - we only want to trigger on the initHookAfterIocBuilt state - ignore all others - */ -void qsrvSingleSourceInit(initHookState theInitHookState) { - if(!IOCSource::enabled()) - return; - if (theInitHookState == initHookAtBeginning) { - (void)dbRegisterServer(&qsrv2Server); - } else - if (theInitHookState == initHookAfterIocBuilt) { - pvxs::ioc::server().addSource("qsrvSingle", std::make_shared(), 0); - } +} // namespace + +namespace pvxs { +namespace ioc { + +void dbRegisterQSRV2() +{ + (void)dbRegisterServer(&qsrv2Server); } -/** - * IOC pvxs Single Source registrar. This implements the required registrar function that is called by xxxx_registerRecordDeviceDriver, - * the auto-generated stub created for all IOC implementations. - * - * It is registered by using the `epicsExportRegistrar()` macro. - * - * 1. Specify here all of the commands that you want to be registered and available in the IOC shell. - * 2. Register your hook handler to handle any state hooks that you want to implement. Here we install - * an `initHookState` handler connected to the `initHookAfterIocBuilt` state. It will add all of the - * single record type sources defined so far. Note that you can define sources up until the `iocInit()` call, - * after which point the `initHookAfterIocBuilt` handlers are called and will register all the defined records. - */ -void pvxsSingleSourceRegistrar() { +void addSingleSrc() +{ + pvxs::ioc::server() + .addSource("qsrvSingle", std::make_shared(), 0); +} + +void single_enable() { // Register commands to be available in the IOC shell IOCShCommand("pvxsl", "details", "List PV names.\n") .implementation<&pvxsl>(); - - initHookRegister(&qsrvSingleSourceInit); } -} // namespace +}} // namespace pvxs::ioc -// in .dbd file -//registrar(pvxsSingleSourceRegistrar) -extern "C" { -epicsExportRegistrar(pvxsSingleSourceRegistrar); -} diff --git a/ioc/subscriptionctx.h b/ioc/subscriptionctx.h index db99c5ed6..2b3f3246a 100644 --- a/ioc/subscriptionctx.h +++ b/ioc/subscriptionctx.h @@ -39,7 +39,8 @@ class Subscription { user_sub, user_arg, select), [chan](dbEventSubscription sub) mutable { - db_cancel_event(sub); + if(sub) + db_cancel_event(sub); chan = Channel(); // dbChannel* must outlive subscription }); if(!sub) diff --git a/qsrv/softMain.cpp b/qsrv/softMain.cpp index c5e3d179d..eb98d3231 100644 --- a/qsrv/softMain.cpp +++ b/qsrv/softMain.cpp @@ -141,9 +141,6 @@ int main(int argc, char *argv[]) bool loadedDb = false; bool ranScript = false; - if(!getenv("PVXS_QSRV_ENABLE")) - epicsEnvSet("PVXS_QSRV_ENABLE","YES"); - #if EPICS_VERSION_INT >= VERSION_INT(7, 0, 3, 1) // attempt to compute relative paths { diff --git a/setup.py b/setup.py index ec3b9d1bf..d3a413561 100755 --- a/setup.py +++ b/setup.py @@ -593,6 +593,11 @@ def define_DSOS(self): "ioc/singlesourcehooks.cpp", "ioc/singlesrcsubscriptionctx.cpp", "ioc/typeutils.cpp", + "ioc/pvalink_channel.cpp", + "ioc/pvalink.cpp", + "ioc/pvalink_jlif.cpp", + "ioc/pvalink_link.cpp", + "ioc/pvalink_lset.cpp", ] probe = ProbeToolchain() diff --git a/test/Makefile b/test/Makefile index 2abca671e..9cd3cb321 100644 --- a/test/Makefile +++ b/test/Makefile @@ -11,6 +11,7 @@ include $(TOP)/configure/CONFIG_PVXS_VERSION # access to private headers USR_CPPFLAGS += -I$(TOP)/src +USR_CPPFLAGS += -I$(TOP)/ioc PROD_LIBS = pvxs Com @@ -120,6 +121,13 @@ ifdef BASE_7_0 TESTPROD_HOST += benchdata benchdata_SRCS += benchdata.cpp +TESTPROD_HOST += testpvalink +testpvalink_SRCS += testpvalink.cpp +testpvalink_SRCS += testioc_registerRecordDeviceDriver.cpp +testpvalink_LIBS += pvxsIoc pvxs $(EPICS_BASE_IOC_LIBS) +TESTS += testpvalink +TESTFILES += ../testpvalink.db + endif ifdef BASE_3_15 diff --git a/test/testioc.h b/test/testioc.h index 39f199402..3ed6fb4b3 100644 --- a/test/testioc.h +++ b/test/testioc.h @@ -16,31 +16,6 @@ #include #include -class TestIOC { - bool running = false; -public: - TestIOC() { - testdbPrepare(); - pvxs::ioc::testPrepare(); - } - void init() { - if(!running) { - testIocInitOk(); - running = true; - } - } - void shutdown() { - if(running) { - pvxs::ioc::testShutdown(); - testIocShutdownOk(); - } - } - ~TestIOC() { - this->shutdown(); - testdbCleanup(); - } -}; - struct TestClient : pvxs::client::Context { TestClient() : pvxs::client::Context(pvxs::ioc::server().clientConfig().build()) {} diff --git a/test/testpvalink.cpp b/test/testpvalink.cpp new file mode 100644 index 000000000..11d2b49ed --- /dev/null +++ b/test/testpvalink.cpp @@ -0,0 +1,524 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define PVXS_ENABLE_EXPERT_API + +#include +#include +#include +#include +#include +#include +#include + +#include "dblocker.h" +#include "qsrvpvt.h" +#include "pvalink.h" + +using namespace pvxs::ioc; +using namespace pvxs; + +namespace { + struct TestMonitor { + testMonitor * const mon; + TestMonitor(const char* pvname, unsigned dbe_mask, unsigned opt=0) + :mon(testMonitorCreate(pvname, dbe_mask, opt)) + {} + ~TestMonitor() { testMonitorDestroy(mon); } + void wait() { testMonitorWait(mon); } + unsigned count(bool reset=true) { return testMonitorCount(mon, reset); } + }; + + void testGet() + { + testDiag("==== testGet ===="); + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbGetFieldEqual("target:i.VAL", DBF_LONG, 42L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 0L); // value before first process + + testdbGetFieldEqual("src:i1.INP", DBF_STRING, "{\"pva\":\"target:i\"}"); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 42L); + + testdbPutFieldOk("src:i1.INP", DBF_STRING, "{\"pva\":\"target:ai\"}"); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 42L); // changing link doesn't automatically process + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); // now it's changed + } + + void testFieldLinks() { + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test field links ===="); + + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"field\":\"display.precision\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); // changing link doesn't automatically process + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 2L); // changing link doesn't automatically process + + } + + void testProc() + { + + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test proc settings ===="); + + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 2L); + + // Set it to CPP + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"proc\":\"CPP\"}}"; + { + TestMonitor m("src:i1", DBE_VALUE); + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + // wait for initial scan + m.wait(); + } + + // Link should read current value of target. + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); + + { + QSrvWaitForLinkUpdate C(&i1->inp); + testdbPutFieldOk("target:ai", DBF_FLOAT, 5.0); + } + + // now it's changed + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 5L); + } + + void testSevr() + { + longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); + + testDiag("==== Test severity forwarding (NMS, MS, MSI) ===="); + + std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"NMS\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbPutFieldOk("target:ai.LOLO", DBF_FLOAT, 5.0); + testdbPutFieldOk("target:ai.LLSV", DBF_STRING, "MAJOR"); + testdbPutFieldOk("target:ai", DBF_FLOAT, 0.0); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevNone); + + pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"MS\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevMajor); + + pv_name = "{\"pva\":{\"pv\":\"target:mbbi\",\"sevr\":\"MSI\"}}"; + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); + + testqsrvWaitForLinkConnected(&i1->inp); + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevNone); + + { + QSrvWaitForLinkUpdate C(&i1->inp); + testdbPutFieldOk("target:ai", DBF_FLOAT, 1.0); + } + + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevInvalid); + } + + void testPut() + { + testDiag("==== testPut ===="); + + longoutRecord *o2 = (longoutRecord *)testdbRecordPtr("src:o2"); + + testqsrvWaitForLinkConnected(&o2->out); + + testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 43L); + testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 0L); + testdbGetFieldEqual("src:o2.OUT", DBF_STRING, "{\"pva\":\"target:i2\"}"); + + { + QSrvWaitForLinkUpdate C(&o2->out); + testdbPutFieldOk("src:o2.VAL", DBF_LONG, 14L); + } + + testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 14L); + testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 14L); + } + + void testStrings() + { + testDiag("==== testStrings ===="); + + stringoutRecord *so = (stringoutRecord *)testdbRecordPtr("src:str"); + + testdbGetFieldEqual("target:str1", DBF_STRING, "foo"); + testdbPutFieldOk("target:str1.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("target:str1", DBF_STRING, "bar"); + + testdbPutFieldOk("src:str.OUT", DBF_STRING, R"({"pva" : "target:str2"})"); + + testqsrvWaitForLinkConnected(&so->out); + + { + QSrvWaitForLinkUpdate C(&so->out); + testdbPutFieldOk("src:str.PROC", DBF_LONG, 1L); + } + + testdbGetFieldEqual("target:str2", DBF_STRING, "bar"); + } + + void testToFromString() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("testToFromString:src.OUT"); + testqsrvWaitForLinkConnected("testToFromString:str2.INP"); + testqsrvWaitForLinkConnected("testToFromString:out.INP"); + + { + QSrvWaitForLinkUpdate C("testToFromString:out.INP"); + testdbPutFieldOk("testToFromString:src", DBR_LONG, 43); + } + + testdbGetFieldEqual("testToFromString:str1", DBR_STRING, "43"); + testdbGetFieldEqual("testToFromString:str2", DBR_STRING, "43"); + testdbGetFieldEqual("testToFromString:out", DBR_LONG, 43); + } + + void testArrays() + { + auto aai_inp = (aaiRecord *)testdbRecordPtr("target:aai_inp"); + testDiag("==== testArrays ===="); + static const epicsFloat32 input_arr[] = {1, 2, -1, 1.2, 0}; + { + QSrvWaitForLinkUpdate C(&aai_inp->inp); + testdbPutArrFieldOk("source:aao", DBR_FLOAT, 5, input_arr); + } + + // underlying channel cache updated, but record has not be re-processed + testdbGetArrFieldEqual("target:aai_inp", DBF_CHAR, 10, 0, NULL); + + static const epicsInt8 expected_char[] = {1, 2, -1, 1, 0}; + testdbPutFieldOk("target:aai_inp.PROC", DBF_LONG, 1L); + testdbGetArrFieldEqual("target:aai_inp", DBF_CHAR, 10, 5, expected_char); + + static const epicsUInt32 expected_ulong[] = {1L, 2L, 4294967295L, 1L, 0}; + testdbGetArrFieldEqual("target:aai_inp", DBF_ULONG, 10, 5, expected_ulong); + + testqsrvWaitForLinkConnected("target:aai_inp_first.INP"); + testdbPutFieldOk("target:aai_inp_first.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("target:aai_inp_first", DBR_DOUBLE, 1.0); + } + + void testStringArray() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("sarr:inp.INP"); + + const char expect[3][MAX_STRING_SIZE] = {"one", "two", "three"}; + { + QSrvWaitForLinkUpdate U("sarr:inp.INP"); + + testdbPutArrFieldOk("sarr:src", DBR_STRING, 3, expect); + } + + testdbPutFieldOk("sarr:inp.PROC", DBR_LONG, 0); + + testdbGetArrFieldEqual("sarr:inp", DBR_STRING, 4, 3, expect); + } + + void testPutAsync() + { + testDiag("==== testPutAsync ===="); + + auto trig = (longoutRecord *)testdbRecordPtr("async:trig"); + auto seq = (calcRecord *)testdbRecordPtr("async:seq"); + + testqsrvWaitForLinkConnected(&trig->out); + + TestMonitor done("async:seq", DBE_VALUE, 0); + + testdbPutFieldOk("async:trig.PROC", DBF_LONG, 1); + dbScanLock((dbCommon*)seq); + while(seq->val < 2) { + dbScanUnlock((dbCommon*)seq); + done.wait(); + dbScanLock((dbCommon*)seq); + } + dbScanUnlock((dbCommon*)seq); + + testdbGetFieldEqual("async:target", DBF_LONG, 1); + testdbGetFieldEqual("async:next", DBF_LONG, 2); + testdbGetFieldEqual("async:seq", DBF_LONG, 2); + } + + void testDisconnect() + { + testDiag("==== %s ====", __func__); + auto serv(ioc::server()); + + testdbPutFieldFail(-1, "disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevInvalid); + + auto special(server::SharedPV::buildReadonly()); + special.open(nt::NTScalar{TypeCode::Int32}.create() + .update("value", 43)); + serv.addPV("special:pv", special); + + testqsrvWaitForLinkConnected("disconnected.INP"); + + testdbPutFieldOk("disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevNone); + + serv.removePV("special:pv"); + special.close(); + + testqsrvWaitForLinkConnected("disconnected.INP", false); + + testdbPutFieldFail(-1, "disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevInvalid); + + testdbPutFieldOk("disconnected.INP", DBR_STRING, ""); // avoid further log messages + } + + void testMeta() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("meta:inp.INP"); + + { + auto src = (aiRecord*)testdbRecordPtr("meta:src"); + QSrvWaitForLinkUpdate U("meta:inp.INP"); + dbScanLock((dbCommon*)src); + src->tse = epicsTimeEventDeviceTime; + src->time.secPastEpoch = 0x12345678; + src->time.nsec = 0x10203040; + src->val = 7; + dbProcess((dbCommon*)src); + dbScanUnlock((dbCommon*)src); + } + auto inp = (aiRecord*)testdbRecordPtr("meta:inp"); + + long ret, nelem; + epicsEnum16 stat, sevr; + epicsTimeStamp time; + char egu[10] = ""; + short prec; + double val, lolo, low, high, hihi; + + dbScanLock((dbCommon*)inp); + + testTrue(dbIsLinkConnected(&inp->inp)!=0); + + testEq(dbGetLinkDBFtype(&inp->inp), DBF_DOUBLE); + + // alarm and time meta-data will be "latched" by a call to dbGetLink. + // until then, the initial values are used + + testTrue((ret=dbGetAlarm(&inp->inp, &stat, &sevr))==0 + && stat==LINK_ALARM && sevr==INVALID_ALARM) + <<" ret="<inp, &nelem))==0 && nelem==1) + <<" ret="<val==0) { + dbScanUnlock(prec); + mon.wait(); + dbScanLock(prec); + } + dbScanUnlock(prec); + } + + testdbGetFieldEqual("flnk:tgt", DBF_LONG, 1); + } + + void testAtomic() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("atomic:lnk:1.INP"); + testqsrvWaitForLinkConnected("atomic:lnk:2.INP"); + + { + QSrvWaitForLinkUpdate A("atomic:lnk:1.INP"); + QSrvWaitForLinkUpdate B("atomic:lnk:2.INP"); + + testdbPutFieldOk("atomic:src:1.PROC", DBR_LONG, 0); + } + + epicsUInt32 expect; + { + auto src1(testdbRecordPtr("atomic:src:1")); + dbScanLock(src1); + expect = ((calcoutRecord*)src1)->val; + testEq(expect & ~0xff, 0u); + expect |= expect<<8u; + dbScanUnlock(src1); + } + + testdbGetFieldEqual("atomic:lnk:out", DBF_ULONG, expect); + } + + void testEnum() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("enum:src:b.OUT"); + testqsrvWaitForLinkConnected("enum:src:s.OUT"); + testqsrvWaitForLinkConnected("enum:tgt:s.INP"); + testqsrvWaitForLinkConnected("enum:tgt:b.INP"); + + { + QSrvWaitForLinkUpdate A("enum:tgt:b.INP"); // last in chain... + + testdbPutFieldOk("enum:src:b", DBR_STRING, "one"); + } + + testdbGetFieldEqual("enum:tgt:s", DBR_STRING, "one"); + // not clear how to handle this case, where a string is + // read as DBR_USHORT, which is actually as DBF_ENUM + testTodoBegin("Not yet implimented"); + testdbGetFieldEqual("enum:tgt:b", DBR_STRING, "one"); + testTodoEnd(); + } +} // namespace + +extern "C" void testioc_registerRecordDeviceDriver(struct dbBase *); + +MAIN(testpvalink) +{ + testPlan(92); + testSetup(); + pvxs::logger_config_env(); + + try + { + TestIOC IOC; + + testdbReadDatabase("testioc.dbd", NULL, NULL); + testioc_registerRecordDeviceDriver(pdbbase); + testdbReadDatabase("testpvalink.db", NULL, NULL); + + IOC.init(); + + testGet(); + testFieldLinks(); + testProc(); + testSevr(); + testPut(); + testStrings(); + testToFromString(); + testArrays(); + testStringArray(); + testPutAsync(); + testDisconnect(); + testMeta(); + testFwd(); + testAtomic(); + testEnum(); + } + catch (std::exception &e) + { + testFail("Unexpected exception: %s", e.what()); + } + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); + cleanup_for_valgrind(); + return testDone(); +} diff --git a/test/testpvalink.db b/test/testpvalink.db new file mode 100644 index 000000000..082a885fc --- /dev/null +++ b/test/testpvalink.db @@ -0,0 +1,206 @@ + +# used by testGet(), testFieldLinks, testProc, testSevr +record(longin, "target:i") { + field(VAL, "42") +} +record(ai, "target:ai") { + field(VAL, "4.0") + field(FLNK, "target:mbbi") + field(PREC, "2") +} + +record(longin, "src:i1") { + field(INP, {"pva":"target:i"}) + field(MDEL, "-1") + field(TPRO, "1") +} + +record(mbbi, "target:mbbi") { + field(INP, "target:ai") + field(ZRSV, "NO_ALARM") + field(ONSV, "INVALID") +} + +# used by testPut() +record(longin, "target:i2") { + field(VAL, "43") +} + +record(longout, "src:o2") { + field(OUT, {"pva":"target:i2"}) +} + +# used by testPutAsync() +record(calc, "async:seq") { + field(CALC, "VAL+1") + field(VAL , "0") + field(TPRO, "1") +} + +record(stringin, "target:str1") { + field(INP, {pva: "src:str"}) + field(VAL, "foo") +} + +record(stringin, "target:str2") { + field(VAL, "baz") +} + +record(stringout, "src:str") { + field(VAL, "bar") +} + +record(longout, "testToFromString:src") { + field(VAL , "42") + field(OUT , {pva:"testToFromString:str1"}) +} +record(stringin, "testToFromString:str1") { +} +record(aai, "testToFromString:str2") { + field(FTVL, "STRING") + field(NELM, "5") + field(INP , {pva:{pv:"testToFromString:str1", "proc":"CPP"}}) +} +record(longin, "testToFromString:out") { + field(INP , {pva:{pv:"testToFromString:str2", "proc":"CPP"}}) +} + +record(calc, "async:seq") { + field(CALC, "VAL+1") +} + +record(longout, "async:trig") { + field(DTYP, "Async Soft Channel") + field(OUT , {"pva":{"pv":"async:target", "proc":true}}) + field(FLNK, "async:next") +} + +record(longin, "async:target") { + field(INP , "async:seq PP MS") +} + +record(longin, "async:next") { + field(INP , "async:seq PP MS") +} + +record(aao, "source:aao") { + field(OUT, {pva: "target:aai_out"}) + field(NELM, "5") + field(FTVL, "FLOAT") +} + +record(aai, "target:aai_inp") { + field(INP, {pva: "source:aao"}) + field(NELM, "10") + field(FTVL, "SHORT") +} + +record(aai, "target:aai_out") { + field(NELM, "2") + field(FTVL, "ULONG") +} + +record(ai, "target:aai_inp_first") { + field(INP, {pva: "source:aao"}) +} + +record(longin, "disconnected") { + field(INP, {pva: "special:pv"}) + field(VAL, "42") +} + +record(ao, "meta:src") { + field(DRVH, "10") + field(HOPR, "9") + field(HIHI, "8") + field(HIGH, "7") + field(LOW , "-7") + field(LOLO, "-8") + field(LOPR, "-9") + field(DRVL, "-10") + field(HHSV, "MAJOR") + field(HSV , "MINOR") + field(LSV , "MINOR") + field(LLSV, "MAJOR") + field(PREC, "2") + field(EGU , "arb") +} + +record(ai, "meta:inp") { + field(INP, {pva:{pv:"meta:src", sevr:"MS"}}) +} + +record(longout, "flnk:src") { + field(FLNK, {pva:"flnk:tgt"}) +} + +record(calc, "flnk:tgt") { + field(CALC, "VAL+1") +} + + +record(calcout, "atomic:src:1") { + field(CALC, "RNDM*255") + field(OUT , "atomic:src:2.A PP") + info(Q:group, { + "atomic:src":{ + "a": {+channel:"VAL"} + } + }) +} +record(calc, "atomic:src:2") { + field(CALC, "A<<8") + info(Q:group, { + "atomic:src":{ + "b": {+channel:"VAL", +trigger:"*"} + } + }) +} + +record(longin, "atomic:lnk:1") { + field(INP , { + pva:{pv:"atomic:src", field:"a", atomic:true, monorder:0, proc:"CP"} + }) +} +record(longin, "atomic:lnk:2") { + field(INP , { + pva:{pv:"atomic:src", field:"b", atomic:true, monorder:1, proc:"CP"} + }) + field(FLNK, "atomic:lnk:out") +} +record(calc, "atomic:lnk:out") { + field(INPA, "atomic:lnk:1 NPP MS") + field(INPB, "atomic:lnk:2 NPP MS") + field(CALC, "A|B") +} + +record(bo, "enum:src:b") { + field(OUT , {pva:{pv:"enum:tgt", proc:"PP"}}) + field(ZNAM, "zero") + field(ONAM, "one") +} +record(stringout, "enum:src:s") { + field(OUT , {pva:{pv:"enum:tgt", proc:"PP"}}) +} +record(bi, "enum:tgt") { + field(ZNAM, "zero") + field(ONAM, "one") +} +record(stringin, "enum:tgt:s") { + field(INP , {pva:{pv:"enum:tgt", proc:"CP"}}) +} +record(bi, "enum:tgt:b") { + field(INP , {pva:{pv:"enum:tgt:s", proc:"CP"}}) + field(ZNAM, "zero") + field(ONAM, "one") +} + +record(waveform, "sarr:src") { + field(FTVL, "STRING") + field(NELM, "16") +} +record(waveform, "sarr:inp") { + field(INP , {pva:"sarr:src"}) + field(FTVL, "STRING") + field(NELM, "16") +} diff --git a/test/testqgroup.cpp b/test/testqgroup.cpp index a04ea6a05..08d7e7a2c 100644 --- a/test/testqgroup.cpp +++ b/test/testqgroup.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "testioc.h" @@ -721,7 +722,7 @@ MAIN(testqgroup) testPlan(37); testSetup(); { - TestIOC ioc; + ioc::TestIOC ioc; asSetFilename("../testioc.acf"); generalTimeRegisterCurrentProvider("test", 1, &testTimeCurrent); testdbReadDatabase("testioc.dbd", nullptr, nullptr); @@ -740,6 +741,8 @@ MAIN(testqgroup) testIQ(); testConst(); } + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); cleanup_for_valgrind(); return testDone(); } diff --git a/test/testqsingle.cpp b/test/testqsingle.cpp index 03e4a7213..201b9b07b 100644 --- a/test/testqsingle.cpp +++ b/test/testqsingle.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -880,13 +881,27 @@ void testMonitorAIFilt(TestClient& ctxt) MAIN(testqsingle) { - testPlan(87); + testPlan(88); testSetup(); pvxs::logger_config_env(); + generalTimeRegisterCurrentProvider("test", 1, &testTimeCurrent); +#if EPICS_VERSION_INT>=VERSION_INT(7, 0, 0, 0) + // start up once to check shutdown and re-start { - TestIOC ioc; + ioc::TestIOC ioc; + testdbReadDatabase("testioc.dbd", nullptr, nullptr); + testOk1(!testioc_registerRecordDeviceDriver(pdbbase)); + testdbReadDatabase("testqsingle.db", nullptr, nullptr); + ioc.init(); + } +#else + // eg. arrInitialize() had a local "firstTime" flag + testSkip(1, "test ioc reinit did not work yet..."); +#endif + { + ioc::TestIOC ioc; + // https://github.com/epics-base/epics-base/issues/438 asSetFilename("../testioc.acf"); - generalTimeRegisterCurrentProvider("test", 1, &testTimeCurrent); testdbReadDatabase("testioc.dbd", nullptr, nullptr); testOk1(!testioc_registerRecordDeviceDriver(pdbbase)); testdbReadDatabase("testqsingle.db", nullptr, nullptr); @@ -913,6 +928,8 @@ MAIN(testqsingle) timeSim = false; testPutBlock(); } + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); cleanup_for_valgrind(); return testDone(); }