Skip to content

Commit

Permalink
chore(deps): Bump github.com/gopcua/opcua from 0.4.0 to 0.5.3 (#14686)
Browse files Browse the repository at this point in the history
Co-authored-by: Josh Powers <powersj@fastmail.com>
  • Loading branch information
dependabot[bot] and powersj authored Feb 9, 2024
1 parent f5cf3ff commit cb81959
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/google/gopacket v1.1.19
github.com/google/licensecheck v0.3.1
github.com/google/uuid v1.5.0
github.com/gopcua/opcua v0.4.0
github.com/gopcua/opcua v0.5.3
github.com/gophercloud/gophercloud v1.7.0
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,8 @@ github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gopcua/opcua v0.4.0 h1:Pr0PMFViNOzvkcvmzP3yTKqtLFVL1OUgav3tDj+hpqQ=
github.com/gopcua/opcua v0.4.0/go.mod h1:6BsaYGu33RhVRxnK+EqHWwSG+hYCSAMjyIjx3RGV1PQ=
github.com/gopcua/opcua v0.5.3 h1:K5QQhjK9KQxQW8doHL/Cd8oljUeXWnJJsNgP7mOGIhw=
github.com/gopcua/opcua v0.5.3/go.mod h1:nrVl4/Rs3SDQRhNQ50EbAiI5JSpDrTG6Frx3s4HLnw4=
github.com/gophercloud/gophercloud v1.7.0 h1:fyJGKh0LBvIZKLvBWvQdIgkaV5yTM3Jh9EYUh+UNCAs=
github.com/gophercloud/gophercloud v1.7.0/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
Expand Down
11 changes: 7 additions & 4 deletions plugins/common/opcua/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
}

// Connect to an OPC UA device
func (o *OpcUAClient) Connect() error {
func (o *OpcUAClient) Connect(ctx context.Context) error {
o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
Expand All @@ -190,14 +190,17 @@ func (o *OpcUAClient) Connect() error {

if o.Client != nil {
o.Log.Warnf("Closing connection to %q as already connected", u)
if err := o.Client.Close(); err != nil {
if err := o.Client.Close(ctx); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}

o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...)
o.Client, err = opcua.NewClient(o.Config.Endpoint, o.opts...)
if err != nil {
return fmt.Errorf("error in new client: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
if err := o.Client.Connect(ctx); err != nil {
Expand All @@ -221,7 +224,7 @@ func (o *OpcUAClient) Disconnect(ctx context.Context) error {
switch u.Scheme {
case "opc.tcp":
// We can't do anything about failing to close a connection
err := o.Client.CloseWithContext(ctx)
err := o.Client.Close(ctx)
o.Client = nil
return err
default:
Expand Down
2 changes: 1 addition & 1 deletion plugins/common/opcua/input/input_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func TestMetricForNode(t *testing.T) {
status: ua.StatusOK,
expected: metric.New("testingmetric",
map[string]string{"t1": "v1", "id": "ns=3;s=hi"},
map[string]interface{}{"Quality": "OK (0x0)", "fn": 16},
map[string]interface{}{"Quality": "The operation succeeded. StatusGood (0x0)", "fn": 16},
time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})),
},
}
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/opcua/opcua_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
Expand Down
9 changes: 6 additions & 3 deletions plugins/inputs/opcua/read_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ReadClient struct {

// internal values
req *ua.ReadRequest
ctx context.Context
}

func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) {
Expand All @@ -52,7 +53,9 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
}

func (o *ReadClient) Connect() error {
if err := o.OpcUAClient.Connect(); err != nil {
o.ctx = context.Background()

if err := o.OpcUAClient.Connect(o.ctx); err != nil {
return fmt.Errorf("connect failed: %w", err)
}

Expand All @@ -68,7 +71,7 @@ func (o *ReadClient) Connect() error {
readValueIds = append(readValueIds, &ua.ReadValueID{NodeID: nid})
}
} else {
regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{
regResp, err := o.Client.RegisterNodes(o.ctx, &ua.RegisterNodesRequest{
NodesToRegister: o.NodeIDs,
})
if err != nil {
Expand Down Expand Up @@ -133,7 +136,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) {
}

func (o *ReadClient) read() error {
resp, err := o.Client.Read(o.req)
resp, err := o.Client.Read(o.ctx, o.req)
if err != nil {
o.ReadError.Incr(1)
return fmt.Errorf("RegisterNodes Read failed: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/opcua_listener/opcua_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
"The operation succeeded. StatusGood (0x0)",
"The operation succeeded. StatusGood (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
Expand Down
18 changes: 10 additions & 8 deletions plugins/inputs/opcua_listener/subscribe_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type SubscribeClient struct {
dataNotifications chan *opcua.PublishNotificationData
metrics chan telegraf.Metric

processingCtx context.Context
processingCancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
}

func checkDataChangeFilterParameters(params *input.DataChangeFilter) error {
Expand Down Expand Up @@ -91,6 +91,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
return nil, err
}

processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &SubscribeClient{
OpcUAInputClient: client,
Config: *sc,
Expand All @@ -100,6 +101,8 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
dataNotifications: make(chan *opcua.PublishNotificationData, 100),
metrics: make(chan telegraf.Metric, 100),
ctx: processingCtx,
cancel: processingCancel,
}

log.Debugf("Creating monitored items")
Expand All @@ -116,13 +119,13 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
}

func (o *SubscribeClient) Connect() error {
err := o.OpcUAClient.Connect()
err := o.OpcUAClient.Connect(o.ctx)
if err != nil {
return err
}

o.Log.Debugf("Creating OPC UA subscription")
o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{
o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{
Interval: time.Duration(o.Config.SubscriptionInterval),
}, o.dataNotifications)
if err != nil {
Expand All @@ -145,7 +148,7 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
}
}
closing := o.OpcUAInputClient.Stop(ctx)
o.processingCancel()
o.cancel()
return closing
}

Expand All @@ -167,7 +170,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
return nil, err
}

resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
if err != nil {
return nil, fmt.Errorf("failed to start monitoring items: %w", err)
}
Expand All @@ -187,7 +190,6 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
}
}

o.processingCtx, o.processingCancel = context.WithCancel(context.Background())
go o.processReceivedNotifications()

return o.metrics, nil
Expand All @@ -196,7 +198,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
func (o *SubscribeClient) processReceivedNotifications() {
for {
select {
case <-o.processingCtx.Done():
case <-o.ctx.Done():
o.Log.Debug("Processing received notifications stopped")
return

Expand Down

0 comments on commit cb81959

Please sign in to comment.