Skip to content

Commit

Permalink
pvalink: porting part 3
Browse files Browse the repository at this point in the history
add pvalink json schema
avoid JSON5 in testpvalink for portability.
fixup build with pvalink
trap bad_weak_ptr open during dtor
  Not sure why this is happening, but need not be CRIT.
c++11, cleanup, and notes
fix pvalink test sync
fix test cleanup on exit
pvalink disconnected link is always INVALID
pvalink logging
pvalink capture Disconnect time
pvalink eliminate providerName
  restrict local to dbChannelTest()
  aka. no qsrv groups
pvalink onTypeChange when attaching link to existing channel
pvalink eliminate unused Connecting state
pvalink add InstCounter
pvalink AfterPut can be const
pvalink add atomic jlif flag
include epicsStdio.h later
  avoid #define printf troubles
assert cleanup state on exit
pvalink add newer lset functions
test link disconnect
testpvalink redo testPutAsync()
pvalink fill out meta-data fetch
pvalink fix FLNK
pvalink cache putReq
pvalink test atomic monitor
pvalink test enum handling
pvalink handle scalar read of empty array
  make it well defined anyway...
pvalink test array of strings
handle db_add_event() failure
handle record._options.DBE
  • Loading branch information
mdavidsaver committed Nov 19, 2023
1 parent aad47ae commit ddd96b9
Show file tree
Hide file tree
Showing 17 changed files with 997 additions and 403 deletions.
36 changes: 36 additions & 0 deletions documentation/pvalink-schema-0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://mdavidsaver.github.io/pvxs/pvalink-schema-0.json",
"title": "PVA Link schema",
"type": ["string", "object"],
"properties": {
"pv": { "type": "string" },
"field": {
"type": "string",
"default": "value"
},
"Q": {
"type": "integer",
"default": 4
},
"proc": {
"type": ["boolean", "string", "null"],
"enum": [true, false, null, "", "NPP", "PP", "CP", "CPP"],
"default": null
},
"sevr": {
"type": ["boolean", "string"],
"enum": [true, false, "NMS", "MS", "MSI", "MSS"],
"default": "NMS"
},
"time": { "type": "boolean", "default": false },
"monorder": { "type": "integer", "default": 0 },
"defer": { "type": "boolean", "default": false },
"retry": { "type": "boolean", "default": false },
"pipeline": { "type": "boolean", "default": false },
"always": { "type": "boolean", "default": false },
"atomic": { "type": "boolean", "default": false },
"local": { "type": "boolean", "default": false }
},
"additionalProperties": false
}
6 changes: 5 additions & 1 deletion ioc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pvxsIoc_SRCS += pvalink_jlif.cpp
pvxsIoc_SRCS += pvalink_link.cpp
pvxsIoc_SRCS += pvalink_lset.cpp

else

pvxsIoc_SRCS += dummygroup.cpp

endif # BASE_7_0

pvxsIoc_LIBS += $(EPICS_BASE_IOC_LIBS)
Expand Down Expand Up @@ -91,4 +95,4 @@ ifdef BASE_7_0
else
../O.Common/pvxsIoc.dbd: ../pvxs3x.dbd
$(CP) $< $@
endif
endif
108 changes: 85 additions & 23 deletions ioc/pvalink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
#include <alarm.h>
#include <epicsExit.h>
#include <epicsAtomic.h>
#include <errlog.h>
#include <link.h>
#include <dbJLink.h>
#include <epicsUnitTest.h>
#include <epicsString.h>

#define PVXS_ENABLE_EXPERT_API

#include <pvxs/server.h>

#include "channel.h"
#include "pvalink.h"
#include "dblocker.h"
#include "dbentry.h"
Expand Down Expand Up @@ -63,10 +67,8 @@ static void shutdownStep2()

{
Guard G(pvaGlobal->lock);
if(pvaGlobal->channels.size()) {
fprintf(stderr, "pvaLink leaves %zu channels open\n",
pvaGlobal->channels.size());
}
assert(pvaLink::cnt_pvaLink<=1u); // dbRemoveLink() already called
assert(pvaGlobal->channels.empty());
}

delete pvaGlobal;
Expand Down Expand Up @@ -185,20 +187,85 @@ void testqsrvCleanup(void)
}
}

void testqsrvWaitForLinkEvent(struct link *plink)
static
std::shared_ptr<pvaLinkChannel> testGetPVALink(struct link *plink)
{
std::shared_ptr<pvaLinkChannel> lchan;
{
DBLocker lock(plink->precord);
DBLocker lock(plink->precord);

if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) {
testAbort("Not a PVA link");
}
pvaLink *pval = static_cast<pvaLink*>(plink->value.json.jlink);
lchan = pval->lchan;
if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) {
testAbort("Not a PVA link");
}
pvaLink *pval = static_cast<pvaLink*>(plink->value.json.jlink);
if(!pval->lchan)
testAbort("PVA link w/o channel?");
return pval->lchan;
}

static
DBLINK* testGetLink(const char *pv)
{
Channel chan(pv);
switch(dbChannelFieldType(chan)) {
case DBF_INLINK:
case DBF_OUTLINK:
case DBF_FWDLINK:
break;
default:
testAbort("%s : not a link field", pv);
}
return static_cast<struct link*>(dbChannelField(chan));
}

void testqsrvWaitForLinkConnected(struct link *plink, bool conn)
{
if(conn)
pvaGlobal->provider_remote.hurryUp();
std::shared_ptr<pvaLinkChannel> lchan(testGetPVALink(plink));
Guard G(lchan->lock);
while(lchan->connected!=conn) {
testDiag("%s(\"%s\", %c) sleep", __func__, plink->precord->name, conn?'C':'D');
UnGuard U(G);
if(!lchan->update_evt.wait(10.0))
testAbort("%s(\"%s\") timeout", __func__, plink->precord->name);
errlogFlush();
testDiag("%s(\"%s\") wakeup", __func__, plink->precord->name);
}
if(lchan) {
lchan->run_done.wait();
errlogFlush();
}

void testqsrvWaitForLinkConnected(const char* pv, bool conn)
{
testqsrvWaitForLinkConnected(testGetLink(pv), conn);
}

QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(struct link *plink)
:plink(plink)
{
std::shared_ptr<pvaLinkChannel> lchan(testGetPVALink(plink));
Guard G(lchan->lock);
seq = lchan->update_seq;
testDiag("%s(\"%s\") arm at %u", __func__, plink->precord->name, seq);
}

QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(const char *pv)
:QSrvWaitForLinkUpdate(testGetLink(pv))
{}

QSrvWaitForLinkUpdate::~QSrvWaitForLinkUpdate()
{
std::shared_ptr<pvaLinkChannel> lchan(testGetPVALink(plink));
Guard G(lchan->lock);
while(seq == lchan->update_seq) {
testDiag("%s(\"%s\") wait for end of %u", __func__, plink->precord->name, seq);
bool ok;
{
UnGuard U(G);
ok = lchan->update_evt.wait(5.0);
}
if(!ok)
testAbort("%s(\"%s\") timeout at %u", __func__, plink->precord->name, seq);
errlogFlush();
testDiag("%s(\"%s\") wake at %u", __func__, plink->precord->name, seq);
}
}

Expand Down Expand Up @@ -252,7 +319,7 @@ void dbpvar(const char *precordname, int level)
}

nchans++;
if(chan->state == pvaLinkChannel::Connected)
if(chan->connected)
nconn++;

if(!precordname)
Expand All @@ -261,24 +328,21 @@ void dbpvar(const char *precordname, int level)
if(level<=0)
continue;

if(level>=2 || (chan->state != pvaLinkChannel::Connected && level==1)) {
if(level>=2 || (!chan->connected && level==1)) {
if(chan->key.first.size()<=28) {
printf("%28s ", chan->key.first.c_str());
} else {
printf("%s\t", chan->key.first.c_str());
}

printf("conn=%c %zu disconnects, %zu type changes",
chan->state == pvaLinkChannel::Connected?'T':'F',
chan->connected?'T':'F',
chan->num_disconnect,
chan->num_type_change);
if(chan->op_put) {
printf(" Put");
}

if(level>=3) {
printf(", provider '%s'", chan->providerName.c_str());
}
printf("\n");
// level 4 reserved for channel/provider details

Expand Down Expand Up @@ -345,8 +409,6 @@ void installPVAAddLinkHook()
initHookRegister(&initPVALink);
IOCShCommand<const char*, int>("dbpvar", "dbpvar", "record name", "level")
.implementation<&dbpvar>();
// epics::registerRefCounter("pvaLinkChannel", &pvaLinkChannel::num_instances);
// epics::registerRefCounter("pvaLink", &pvaLink::num_instances);
}

}} // namespace pvxs::ioc
Expand Down
Loading

0 comments on commit ddd96b9

Please sign in to comment.