Skip to content

Commit

Permalink
fix(inputs.opcua): Allow to retry reads on invalid sessions (#16026)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0abd184)
  • Loading branch information
srebhan committed Oct 28, 2024
1 parent 337a228 commit feaa654
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 73 deletions.
65 changes: 34 additions & 31 deletions plugins/inputs/opcua/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,61 +30,64 @@ to use them.
[[inputs.opcua]]
## Metric name
# name = "opcua"
#

## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840"
#

## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#

## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"

# Maximum time that a session shall remain open without activity.
## Maximum time that a session shall remain open without activity.
# session_timeout = "20m"
#

## Retry options for failing reads e.g. due to invalid sessions
## If the retry count is zero, the read will fail after the initial attempt.
# read_retry_timeout = "100ms"
# read_retry_count = 0

## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto"
# security_policy = "auto"
#

## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto"
#

## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem"
#

## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem"
#

## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName"

## Username and password required for auth_method = "UserName"
# username = ""
#
## Password. Required for auth_method = "UserName"
# password = ""
#

## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the OPCUA
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false
#

## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#

## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
Expand All @@ -93,12 +96,12 @@ to use them.
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#

## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#

## Bracketed notation
# [[inputs.opcua.nodes]]
# name = "node1"
Expand All @@ -112,7 +115,7 @@ to use them.
# namespace = ""
# identifier_type = ""
# identifier = ""
#

## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
Expand All @@ -126,29 +129,29 @@ to use them.
## Group Metric name. Overrides the top level name. If unset, the
## top level name is used.
# name =
#

## Group default namespace. If a node in the group doesn't set its
## namespace, this is used.
# namespace =
#

## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used.
# identifier_type =
#

## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.

## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#

## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#

## Bracketed notation
# [[inputs.opcua.group.nodes]]
# name = "node1"
Expand All @@ -165,12 +168,12 @@ to use them.

## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# # additional_valid_status_codes = ["0xC0"]

# [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
# ## Use unregistered reads instead of registered reads
# # use_unregistered_reads = false
```

## Node Configuration
Expand Down
56 changes: 45 additions & 11 deletions plugins/inputs/opcua/read_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package opcua

import (
"context"
"errors"
"fmt"
"time"

"github.com/gopcua/opcua/ua"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/selfstat"
Expand All @@ -17,6 +20,8 @@ type ReadClientWorkarounds struct {
}

type ReadClientConfig struct {
ReadRetryTimeout config.Duration `toml:"read_retry_timeout"`
ReadRetries uint64 `toml:"read_retry_count"`
ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"`
input.InputClientConfig
}
Expand All @@ -25,9 +30,11 @@ type ReadClientConfig struct {
type ReadClient struct {
*input.OpcUAInputClient

ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds
ReadRetryTimeout time.Duration
ReadRetries uint64
ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds

// internal values
reqIDs []*ua.ReadValueID
Expand All @@ -44,8 +51,14 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
"endpoint": inputClient.Config.OpcUAClientConfig.Endpoint,
}

if rc.ReadRetryTimeout == 0 {
rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond)
}

return &ReadClient{
OpcUAInputClient: inputClient,
ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout),
ReadRetries: rc.ReadRetries,
ReadSuccess: selfstat.Register("opcua", "read_success", tags),
ReadError: selfstat.Register("opcua", "read_error", tags),
Workarounds: rc.ReadClientWorkarounds,
Expand Down Expand Up @@ -136,14 +149,35 @@ func (o *ReadClient) read() error {
NodesToRead: o.reqIDs,
}

resp, err := o.Client.Read(o.ctx, req)
if err != nil {
var count uint64
for {
count++

// Try to update the values for all registered nodes
resp, err := o.Client.Read(o.ctx, req)
if err == nil {
// Success, update the node values and exit
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)
}
return nil
}
o.ReadError.Incr(1)
return fmt.Errorf("reading registered nodes failed: %w", err)
}
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)

switch {
case count > o.ReadRetries:
// We exceeded the number of retries and should exit
return fmt.Errorf("reading registered nodes failed after %d attempts: %w", count, err)
case errors.Is(err, ua.StatusBadSessionIDInvalid),
errors.Is(err, ua.StatusBadSessionNotActivated),
errors.Is(err, ua.StatusBadSecureChannelIDInvalid):
// Retry after the defined period as session and channels should be refreshed
o.Log.Debugf("reading failed with %v, retry %d / %d...", err, count, o.ReadRetries)
time.Sleep(o.ReadRetryTimeout)
default:
// Non-retryable error, there is nothing we can do
return fmt.Errorf("reading registered nodes failed: %w", err)
}
}
return nil
}
Loading

0 comments on commit feaa654

Please sign in to comment.