-
Notifications
You must be signed in to change notification settings - Fork 94
Add dataset "device syslog" to non-db client (POC) #83
base: master
Are you sure you want to change the base?
Changes from all commits
407a8a4
c44b32e
fde6d90
dcfe54f
8985eb8
77416a6
fc8b9a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# SONiC Telemetry Syslog Pipeline | ||
* [Overview](#overview) | ||
* [Purpose](#purpose) | ||
* [Syslog Extraction Method](#syslog-extraction-method) | ||
* [Pipeline Design](#pipeline-design) | ||
* [Design Implementation](#design-implementation) | ||
|
||
# Overview | ||
## Purpose: | ||
To replace the original syslog pipeline to allow a secure transfer of syslog information from the SONiC device using the streaming telemetry servive to the client. | ||
|
||
|
||
## Syslog Extraction Method | ||
There were two methods considered for extracting the syslogs from the device to be available to the streaming telemetry service: | ||
|
||
* Method 1: Reading the information directly from the log files (/var/log/syslog*) | ||
* Method 2: Streaming the syslog updates from the device's syslog server | ||
|
||
Method 2 was chosen to proceed with due it ability to bypass the log rotate issue and lessened CPU time. | ||
|
||
# Pipeline Design | ||
|
||
![SONiC TELEMETRY SYSLOG PIPELINE](img/syslogPipeline.PNG) | ||
|
||
The syslog messages are sent to the telemetry service through a newly created syslog server port. The messages are taken to the gRPC server where it is forwarded to the collector service. | ||
|
||
**The gMNI server will receive the syslog information utilizing streaming mode through the OTHERS database.** | ||
|
||
|
||
# Design Implementation | ||
In order to create this new functionality, A function and data set path was added to the [non-db client](/sonic-data-client/non_db_client.go). | ||
|
||
This new function was created in order to retrieve and read messages from the syslog server as demonstrated through the following flowchart: | ||
|
||
![SYSLOG NON-DB CLIENT FUNCTION](img/syslogNonDbFlowChart.PNG) | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
package client | ||
|
||
import ( | ||
"container/list" | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -28,6 +30,25 @@ const ( | |
|
||
type dataGetFunc func() ([]byte, error) | ||
|
||
// connects the port for the syslog messages | ||
func openPort() { | ||
hostName := "localhost" | ||
portNum := "5150" | ||
service := hostName + ":" + portNum | ||
RemoteAddr, err := net.ResolveUDPAddr("udp4", service) | ||
portConn, err := net.ListenUDP("udp4", RemoteAddr) | ||
if err != nil { | ||
log.V(2).Infof("%v", err) | ||
return | ||
} | ||
syslogPort = portConn | ||
} | ||
|
||
// closes the port for the syslog messages | ||
func closePort() { | ||
syslogPort.Close() | ||
} | ||
|
||
type path2DataFunc struct { | ||
path []string | ||
getFunc dataGetFunc | ||
|
@@ -61,6 +82,9 @@ var ( | |
clientTrie *Trie | ||
statsR statsRing | ||
|
||
// the connection port for the syslog data set | ||
syslogPort *net.UDPConn | ||
|
||
versionFileStash sonicVersionYmlStash | ||
|
||
// ImplIoutilReadFile points to the implementation of ioutil.ReadFile. Should be overridden by UTs only. | ||
|
@@ -101,6 +125,10 @@ var ( | |
path: []string{"OTHERS", "osversion", "build"}, | ||
getFunc: dataGetFunc(getBuildVersion), | ||
}, | ||
{ // Get device syslog | ||
path: []string{"OTHERS", "device", "syslog"}, | ||
getFunc: dataGetFunc(getDeviceSyslog), | ||
}, | ||
} | ||
) | ||
|
||
|
@@ -229,6 +257,23 @@ func getCpuUtil() ([]byte, error) { | |
return b, nil | ||
} | ||
|
||
func getDeviceSyslog() ([]byte, error) { | ||
bufferSize := 1024 | ||
buffer := make([]byte, bufferSize) | ||
payloadSize, _, err := syslogPort.ReadFromUDP(buffer) | ||
if err != nil { | ||
log.V(2).Infof("%v", err) | ||
return buffer, err | ||
} | ||
if bufferSize < payloadSize { | ||
log.V(2).Infof("Payload was larger than buffer", err) | ||
return nil, nil | ||
} | ||
|
||
log.V(4).Infof("getDeviceSyslog, output %v", string(buffer[:payloadSize])) | ||
return buffer[:payloadSize], nil | ||
} | ||
|
||
func getProcMeminfo() ([]byte, error) { | ||
memInfo, _ := linuxproc.ReadMemInfo("/proc/meminfo") | ||
b, err := json.Marshal(memInfo) | ||
|
@@ -428,6 +473,10 @@ func (c *NonDbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *s | |
for _, sub := range subscribe.GetSubscription() { | ||
subMode := sub.GetMode() | ||
if subMode != gnmipb.SubscriptionMode_SAMPLE { | ||
if subMode == gnmipb.SubscriptionMode_ON_CHANGE { | ||
streamOnChange(c, stop, sub) | ||
return | ||
} | ||
putFatalMsg(c.q, fmt.Sprintf("Unsupported subscription mode: %v.", subMode)) | ||
return | ||
} | ||
|
@@ -494,6 +543,55 @@ func streamSample(c *NonDbClient, stop chan struct{}, sub *gnmipb.Subscription, | |
} | ||
} | ||
|
||
func streamOnChange(c *NonDbClient, stop chan struct{}, sub *gnmipb.Subscription) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does each subscriber have their owner streamOnChange? If how will you handle the case two client subscribe to syslog as the same time? have you tested it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you plan to enable onchange mode for other dataset? If not you should disable them in the call flow to avoid expected issue in the server. |
||
storageBuffer := list.New() | ||
gnmiPath := sub.GetPath() | ||
openPort() | ||
getter, _ := c.path2Getter[gnmiPath] | ||
|
||
c.q.Put(Value{ | ||
&spb.Value{ | ||
Timestamp: time.Now().UnixNano(), | ||
SyncResponse: true, | ||
}, | ||
}) | ||
|
||
for { | ||
elem, _ := getter() | ||
// adds new elem to storage space | ||
storageBuffer.PushBack(elem) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you get elem as nil here and how do you handle it? |
||
select { | ||
case <-stop: | ||
log.V(1).Infof("Stopping NonDbClient.streamOnChange routine for sub '%s'", sub) | ||
// closes the port once the connection has been made | ||
closePort() | ||
return | ||
|
||
default: | ||
// prints value to the gnmi client | ||
spbv := &spb.Value{ | ||
Prefix: c.prefix, | ||
Path: gnmiPath, | ||
Timestamp: time.Now().UnixNano(), | ||
SyncResponse: false, | ||
Val: &gnmipb.TypedValue{ | ||
Value: &gnmipb.TypedValue_JsonIetfVal{ | ||
JsonIetfVal: storageBuffer.Back().Value.([]byte), | ||
}}, | ||
} | ||
err := c.q.Put(Value{spbv}) | ||
if err != nil { | ||
log.V(3).Infof("Failed to put for %v, %v", gnmiPath, err) | ||
} else { | ||
log.V(6).Infof("Added spbv #%v", spbv) | ||
} | ||
// removes the message once it has been sent to the client | ||
storageBuffer.Remove(storageBuffer.Front()) | ||
} | ||
|
||
} | ||
} | ||
|
||
// runGetterAndSend runs a given getter method and puts the result to client queue. | ||
func runGetterAndSend(c *NonDbClient, gnmiPath *gnmipb.Path, getter dataGetFunc) error { | ||
v, err := getter() | ||
|
@@ -581,7 +679,7 @@ func (c *NonDbClient) Close() error { | |
return nil | ||
} | ||
|
||
func (c *NonDbClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { | ||
func (c *NonDbClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what change did you make here? |
||
return nil | ||
} | ||
func (c *NonDbClient) Capabilities() []gnmipb.ModelData { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to change the comment of the StreamRun as well: "It supports SAMPLE mode only."