Skip to content

Commit

Permalink
fix(opcua): fix certificate management and reconnection ; switch to n…
Browse files Browse the repository at this point in the history
…ode-opcua-client (lighter)
  • Loading branch information
burgerni10 authored and Nicolas Burger committed Aug 25, 2022
1 parent 633a49d commit fcfe8e7
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 1,660 deletions.
1,473 changes: 156 additions & 1,317 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
"mysql2": "2.3.3",
"nanoid": "3.3.4",
"node-fetch": "2.6.7",
"node-opcua": "2.74.0",
"node-opcua-certificate-manager": "2.74.0",
"node-opcua-client": "2.74.0",
"object-path": "0.11.8",
"papaparse": "5.3.2",
"pg": "8.7.3",
Expand Down
2 changes: 1 addition & 1 deletion src/engine/OIBusEngine.class.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class OIBusEngine extends BaseEngine {
}) => {
if (scanMode !== 'listen') {
const job = timexe(cronTime, () => {
// on each scan, activate each protocols
// on each scan, activate each protocol
this.scanLists[scanMode].forEach(async (id) => {
await this.activeProtocols[id].onScan(scanMode)
})
Expand Down
48 changes: 28 additions & 20 deletions src/south/OPCUA/OPCUA_DA/OPCUA_DA.class.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
const Opcua = require('node-opcua')
const {
OPCUAClient,
MessageSecurityMode,
SecurityPolicy,
UserTokenType,
} = require('node-opcua-client')
const { OPCUACertificateManager } = require('node-opcua-certificate-manager')
const ProtocolHandler = require('../../ProtocolHandler.class')
const { initOpcuaCertificateFolders } = require('../opcua.service')
Expand Down Expand Up @@ -54,14 +59,18 @@ class OPCUA_DA extends ProtocolHandler {
*/
async connect() {
await super.connect()
await this.session?.close() // close the session if it already exists
await this.connectToOpcuaServer()
}

async init() {
await super.init()
await initOpcuaCertificateFolders(this.encryptionService.certsFolder)
if (!this.clientCertificateManager) {
this.clientCertificateManager = new OPCUACertificateManager({ rootFolder: `${this.encryptionService.certsFolder}/opcua` })
this.clientCertificateManager = new OPCUACertificateManager({
rootFolder: `${this.encryptionService.certsFolder}/opcua`,
automaticallyAcceptUnknownCertificate: true,
})
// Set the state to the CertificateManager to 2 (Initialized) to avoid a call to openssl
// It is useful for offline instances of OIBus where downloading openssl is not possible
this.clientCertificateManager.state = 2
Expand Down Expand Up @@ -131,8 +140,8 @@ class OPCUA_DA extends ProtocolHandler {
}

if (this.connected) {
await this.session.close()
await this.client.disconnect()
await this.session?.close()
this.session = null
}
await super.disconnect()
}
Expand All @@ -147,7 +156,7 @@ class OPCUA_DA extends ProtocolHandler {
return
}
this.subscription = Opcua.ClientSubscription.create(this.session, {
this.subscription = ClientSubscription.create(this.session, {
requestedPublishingInterval: 150,
requestedLifetimeCount: 10 * 60 * 10,
requestedMaxKeepAliveCount: 10,
Expand All @@ -156,18 +165,18 @@ class OPCUA_DA extends ProtocolHandler {
priority: 6,
})
nodesToMonitor.forEach((nodeToMonitor) => {
const monitoredItem = Opcua.ClientMonitoredItem.create(
const monitoredItem = ClientMonitoredItem.create(
this.subscription,
{
nodeId: nodeToMonitor,
attributeId: Opcua.AttributeIds.Value,
attributeId: AttributeIds.Value,
},
{
samplingInterval: 2,
discardOldest: true,
queueSize: 1,
},
Opcua.TimestampsToReturn.Neither,
TimestampsToReturn.Neither,
)
monitoredItem.on('changed', (dataValue) => this.manageDataValues([dataValue], nodesToMonitor))
Expand All @@ -194,38 +203,37 @@ class OPCUA_DA extends ProtocolHandler {
const options = {
applicationName: 'OIBus',
connectionStrategy,
securityMode: Opcua.MessageSecurityMode[this.securityMode],
securityPolicy: Opcua.SecurityPolicy[this.securityPolicy],
securityMode: MessageSecurityMode[this.securityMode],
securityPolicy: SecurityPolicy[this.securityPolicy],
endpointMustExist: false,
keepSessionAlive: this.keepSessionAlive,
keepPendingSessionsOnDisconnect: false,
clientName: this.clientName, // the id of the connector
clientCertificateManager: this.clientCertificateManager,
}
this.client = Opcua.OPCUAClient.create(options)
await this.client.connect(this.url)
let userIdentity = null
let userIdentity
if (this.certificate.privateKey && this.certificate.cert) {
userIdentity = {
type: Opcua.UserTokenType.Certificate,
type: UserTokenType.Certificate,
certificateData: this.certificate.cert,
privateKey: Buffer.from(this.certificate.privateKey, 'utf-8').toString(),
}
} else if (this.username) {
userIdentity = {
type: Opcua.UserTokenType.UserName,
type: UserTokenType.UserName,
userName: this.username,
password: this.encryptionService.decryptText(this.password),
}
} else {
userIdentity = { type: UserTokenType.Anonymous }
}
this.session = await this.client.createSession(userIdentity)
this.session = await OPCUAClient.createSession(this.url, userIdentity, options)
this.connected = true
this.logger.info('OPCUA_DA Connected')
this.logger.info(`OPCUA_DA ${this.dataSource.name} connected`)
this.updateStatusDataStream({ 'Connected at': new Date().toISOString() })
} catch (error) {
this.logger.error(error)
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout)
}
await this.disconnect()
this.reconnectTimeout = setTimeout(this.connectToOpcuaServer.bind(this), this.retryInterval)
}
}
Expand Down
160 changes: 41 additions & 119 deletions src/south/OPCUA/OPCUA_DA/OPCUA_DA.class.spec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
const Opcua = require('node-opcua')
const Opcua = require('node-opcua-client')
const OPCUA_DA = require('./OPCUA_DA.class')
const { defaultConfig: config } = require('../../../../tests/testConfig')
const EncryptionService = require('../../../services/EncryptionService.class')

// Mock node-opcua
jest.mock('node-opcua', () => ({
OPCUAClient: { create: jest.fn() },
jest.mock('node-opcua-client', () => ({
OPCUAClient: { createSession: jest.fn() },
MessageSecurityMode: { None: 1 },
SecurityPolicy: { None: 'http://opcfoundation.org/UA/SecurityPolicy#None' },
UserTokenType: { UserName: 1, Certificate: 2 },
UserTokenType: { Anonymous: 0, UserName: 1, Certificate: 2 },
}))
jest.mock('node-opcua-certificate-manager', () => ({ OPCUACertificateManager: jest.fn(() => ({})) }))

Expand Down Expand Up @@ -70,18 +70,8 @@ beforeEach(async () => {

describe('OPCUA-DA south', () => {
it('should be properly initialized', () => {
expect(opcuaSouth.url)
.toEqual(opcuaConfig.OPCUA_DA.url)
expect(opcuaSouth.retryInterval)
.toEqual(opcuaConfig.OPCUA_DA.retryInterval)
})

it('should properly connect and set lastCompletedAt from database', async () => {
opcuaSouth.connectToOpcuaServer = jest.fn()
await opcuaSouth.connect()

expect(opcuaSouth.connectToOpcuaServer)
.toHaveBeenCalledTimes(1)
expect(opcuaSouth.url).toEqual(opcuaConfig.OPCUA_DA.url)
expect(opcuaSouth.retryInterval).toEqual(opcuaConfig.OPCUA_DA.retryInterval)
})

it('should properly connect to OPC UA server without password', async () => {
Expand All @@ -97,25 +87,16 @@ describe('OPCUA-DA south', () => {
securityPolicy: Opcua.SecurityPolicy.None,
endpointMustExist: false,
keepSessionAlive: false,
keepPendingSessionsOnDisconnect: false,
clientCertificateManager: { state: 2 },
}
Opcua.OPCUAClient.create.mockReturnValue({
connect: jest.fn().mockReturnValue({}),
createSession: jest.fn().mockReturnValue({}),
})
const expectedUserIdentity = { type: 0 }
await opcuaSouth.connect()

expect(Opcua.OPCUAClient.create)
.toBeCalledWith(expectedOptions)
expect(opcuaSouth.client.connect)
.toBeCalledWith(opcuaConfig.OPCUA_DA.url)
expect(opcuaSouth.client.createSession)
.toBeCalledTimes(1)
expect(opcuaSouth.connected)
.toBeTruthy()
expect(setTimeoutSpy)
.not
.toBeCalled()
expect(Opcua.OPCUAClient.createSession).toBeCalledTimes(1)
expect(Opcua.OPCUAClient.createSession).toBeCalledWith(opcuaSouth.url, expectedUserIdentity, expectedOptions)
expect(opcuaSouth.connected).toBeTruthy()
expect(setTimeoutSpy).not.toBeCalled()
})

it('should properly connect to OPC UA server with password', async () => {
Expand All @@ -131,12 +112,9 @@ describe('OPCUA-DA south', () => {
securityPolicy: Opcua.SecurityPolicy.None,
endpointMustExist: false,
keepSessionAlive: false,
keepPendingSessionsOnDisconnect: false,
clientCertificateManager: { state: 2 },
}
Opcua.OPCUAClient.create.mockReturnValue({
connect: jest.fn(),
createSession: jest.fn(),
})
opcuaSouth.username = 'username'
opcuaSouth.password = 'password'

Expand All @@ -149,52 +127,21 @@ describe('OPCUA-DA south', () => {
userName: 'username',
password: 'password',
}
expect(Opcua.OPCUAClient.create)
.toBeCalledWith(expectedOptions)
expect(opcuaSouth.client.connect)
.toBeCalledWith(opcuaConfig.OPCUA_DA.url)
expect(opcuaSouth.client.createSession)
.toBeCalledWith(expectedUserIdentity)
expect(opcuaSouth.connected)
.toBeTruthy()
expect(setTimeoutSpy)
.not
.toBeCalled()
expect(Opcua.OPCUAClient.createSession).toBeCalledWith(opcuaSouth.url, expectedUserIdentity, expectedOptions)
expect(opcuaSouth.connected).toBeTruthy()
expect(setTimeoutSpy).not.toBeCalled()
})

it('should properly retry connection to OPC UA server', async () => {
const setTimeoutSpy = jest.spyOn(global, 'setTimeout')
const expectedOptions = {
applicationName: 'OIBus',
clientName: 'myConnectorId',
connectionStrategy: {
initialDelay: 1000,
maxRetry: 1,
},
securityMode: Opcua.MessageSecurityMode.None,
securityPolicy: Opcua.SecurityPolicy.None,
endpointMustExist: false,
keepSessionAlive: false,
clientCertificateManager: { state: 2 },
}
Opcua.OPCUAClient.create.mockReturnValue({
connect: jest.fn(() => Promise.reject()),
createSession: jest.fn(),
})

Opcua.OPCUAClient.createSession.mockReturnValue(new Promise((resolve, reject) => {
reject(new Error('test'))
}))
await opcuaSouth.connect()

expect(Opcua.OPCUAClient.create)
.toBeCalledWith(expectedOptions)
expect(opcuaSouth.client.connect)
.toBeCalledWith(opcuaConfig.OPCUA_DA.url)
expect(opcuaSouth.client.createSession)
.not
.toBeCalled()
expect(opcuaSouth.connected)
.toBeFalsy()
expect(setTimeoutSpy)
.toHaveBeenLastCalledWith(expect.any(Function), opcuaConfig.OPCUA_DA.retryInterval)
expect(opcuaSouth.connected).toBeFalsy()
expect(setTimeoutSpy).toHaveBeenLastCalledWith(expect.any(Function), opcuaConfig.OPCUA_DA.retryInterval)
})

it('should quit onScan if not connected', async () => {
Expand All @@ -203,9 +150,7 @@ describe('OPCUA-DA south', () => {
opcuaSouth.session = { readHistoryValue: jest.fn() }
await opcuaSouth.lastPointQuery(opcuaConfig.points[0].scanMode)

expect(opcuaSouth.session.readHistoryValue)
.not
.toBeCalled()
expect(opcuaSouth.session.readHistoryValue).not.toBeCalled()
})

it('should quit onScan if scanMode has no points to read', async () => {
Expand All @@ -225,9 +170,7 @@ describe('OPCUA-DA south', () => {
opcuaSouthTest.session = { readVariableValue: jest.fn() }
await opcuaSouthTest.lastPointQuery(opcuaConfig.points[0].scanMode)

expect(opcuaSouthTest.session.readVariableValue)
.not
.toBeCalled()
expect(opcuaSouthTest.session.readVariableValue).not.toBeCalled()
})

it('should properly call readVariableValue() and addValues()', async () => {
Expand All @@ -246,67 +189,46 @@ describe('OPCUA-DA south', () => {
opcuaSouth.addValues = jest.fn()
await opcuaSouth.lastPointQuery(opcuaConfig.points[0].scanMode)

expect(opcuaSouth.session.readVariableValue)
.toBeCalledWith(['ns=3;s=Random'])
expect(opcuaSouth.addValues)
.toBeCalledWith([
{
data: {
quality: JSON.stringify({ value: 0 }),
value: 666,
},
pointId: 'Random',
timestamp: new Date(nowDateString).toISOString(),
expect(opcuaSouth.session.readVariableValue).toBeCalledWith(['ns=3;s=Random'])
expect(opcuaSouth.addValues).toBeCalledWith([
{
data: {
quality: JSON.stringify({ value: 0 }),
value: 666,
},
])
pointId: 'Random',
timestamp: new Date(nowDateString).toISOString(),
},
])

global.Date = RealDate
})

it('should properly disconnect when trying to connect', async () => {
const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout')
Opcua.OPCUAClient.create.mockReturnValue({
connect: jest.fn(),
createSession: jest.fn(),
disconnect: jest.fn(),
})

await opcuaSouth.connect()
opcuaSouth.reconnectTimeout = true
opcuaSouth.connected = false
opcuaSouth.session = { close: jest.fn() }
const close = jest.fn()
opcuaSouth.session = { close }
await opcuaSouth.disconnect()

expect(clearTimeoutSpy)
.toBeCalled()
expect(opcuaSouth.session.close)
.not
.toBeCalled()
expect(opcuaSouth.client.disconnect)
.not
.toBeCalled()
expect(clearTimeoutSpy).toBeCalled()
expect(close).not.toBeCalled()
})

it('should properly disconnect when connected', async () => {
const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout')
Opcua.OPCUAClient.create.mockReturnValue({
connect: jest.fn(),
createSession: jest.fn(),
disconnect: jest.fn(),
})

await opcuaSouth.connect()
opcuaSouth.reconnectTimeout = false
opcuaSouth.connected = true
opcuaSouth.session = { close: jest.fn() }
const close = jest.fn()
opcuaSouth.session = { close }
await opcuaSouth.disconnect()

expect(clearTimeoutSpy)
.not
.toBeCalled()
expect(opcuaSouth.session.close)
.toBeCalled()
expect(opcuaSouth.client.disconnect)
.toBeCalled()
expect(clearTimeoutSpy).not.toBeCalled()
expect(close).toBeCalled()
})
})
Loading

0 comments on commit fcfe8e7

Please sign in to comment.