diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 579d5548..bda9f77f 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -19,6 +19,10 @@ resources: type: github name: sonic-net/sonic-mgmt-common endpoint: sonic-net + - repository: sonic-swss-common + type: github + name: sonic-net/sonic-swss-common + endpoint: sonic-net stages: - stage: Build @@ -47,7 +51,12 @@ stages: - checkout: sonic-mgmt-common clean: true submodules: recursive - displayName: 'Checkout code' + displayName: 'Checkout sonic-mgmt-common' + + - checkout: sonic-swss-common + clean: true + submodules: recursive + displayName: 'Checkout sonic-swss-common' - task: DownloadPipelineArtifact@2 inputs: @@ -81,6 +90,16 @@ stages: sudo dpkg -i ../target/debs/buster/libyang*1.0.73*.deb displayName: "Install dependency" + - script: | + # LIBSWSSCOMMON + sudo apt-get -y purge libhiredis-dev libnl-3-dev libnl-route-3-dev + sudo dpkg -i ../target/debs/buster/libnl-3-200_*.deb + sudo dpkg -i ../target/debs/buster/libnl-genl-3-200_*.deb + sudo dpkg -i ../target/debs/buster/libnl-route-3-200_*.deb + sudo dpkg -i ../target/debs/buster/libnl-nf-3-200_*.deb + sudo dpkg -i ../target/debs/buster/libhiredis0.14_*.deb + displayName: "Install libswsscommon dependencies" + - script: | set -ex # Install .NET CORE @@ -90,6 +109,25 @@ stages: sudo apt-get install -y dotnet-sdk-5.0 displayName: "Install .NET CORE" + - task: DownloadPipelineArtifact@2 + inputs: + source: specific + project: build + pipeline: Azure.sonic-swss-common + artifact: sonic-swss-common + runVersion: 'latestFromBranch' + runBranch: 'refs/heads/master' + displayName: "Download sonic-swss-common" + + - script: | + set -ex + # LIBSWSSCOMMON + sudo dpkg -i libswsscommon_1.0.0_amd64.deb + sudo dpkg -i libswsscommon-dev_1.0.0_amd64.deb + sudo dpkg -i python3-swsscommon_1.0.0_amd64.deb + workingDirectory: $(Pipeline.Workspace)/ + displayName: 'Install libswsscommon package' + - script: | set -ex ls -l diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index da4ab17d..54c69de4 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -30,17 +30,28 @@ type Client struct { // Wait for all sub go routine to finish w sync.WaitGroup fatal bool + logLevel int } +// Syslog level for error +const logLevelError int = 3 +const logLevelDebug int = 7 +const logLevelMax int = logLevelDebug + // NewClient returns a new initialized client. func NewClient(addr net.Addr) *Client { pq := queue.NewPriorityQueue(1, false) return &Client{ addr: addr, q: pq, + logLevel: logLevelError, } } +func (c *Client) setLogLevel(lvl int) { + c.logLevel = lvl +} + // String returns the target the client is querying. func (c *Client) String() string { return c.addr.String() @@ -121,8 +132,12 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { } var dc sdc.Client + mode := c.subscribe.GetMode() + if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) + } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { + dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) } else if _, ok, _, _ := sdc.IsTargetDb(target); ok { dc, err = sdc.NewDbClient(paths, prefix) } else { @@ -134,7 +149,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.NotFound, "%v", err) } - switch mode := c.subscribe.GetMode(); mode { + switch mode { case gnmipb.SubscriptionList_STREAM: c.stop = make(chan struct{}, 1) c.w.Add(1) @@ -155,7 +170,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { log.V(1).Infof("Client %s running", c) go c.recv(stream) - err = c.send(stream) + err = c.send(stream, dc) c.Close() // Wait until all child go routines exited c.w.Wait() @@ -226,8 +241,9 @@ func (c *Client) recv(stream gnmipb.GNMI_SubscribeServer) { } // send runs until process Queue returns an error. -func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error { +func (c *Client) send(stream gnmipb.GNMI_SubscribeServer, dc sdc.Client) error { for { + var val *sdc.Value items, err := c.q.Get(1) if items == nil { @@ -241,12 +257,14 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error { } var resp *gnmipb.SubscribeResponse + switch v := items[0].(type) { case sdc.Value: if resp, err = sdc.ValToResp(v); err != nil { c.errors++ return err } + val = &v; default: log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c) c.errors++ @@ -257,8 +275,11 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error { if err != nil { log.V(1).Infof("Client %s sending error:%v", c, err) c.errors++ + dc.FailedSend() return err } + + dc.SentOne(val) log.V(5).Infof("Client %s done sending, msg count %d, msg %v", c, c.sendMsg, resp) } } diff --git a/gnmi_server/server.go b/gnmi_server/server.go index c3717f34..7001fd52 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -47,6 +47,7 @@ type Config struct { // Port for the Server to listen on. If 0 or unset the Server will pick a port // for this Server. Port int64 + LogLevel int UserAuth AuthTypes } @@ -234,6 +235,8 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error { c := NewClient(pr.Addr) + c.setLogLevel(s.config.LogLevel) + s.cMu.Lock() if oc, ok := s.clients[c.String()]; ok { log.V(2).Infof("Delete duplicate client %s", oc) diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index bd10e05a..b2bb3d51 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -8,6 +8,7 @@ import ( "flag" "fmt" "strings" + "unsafe" testcert "github.com/sonic-net/sonic-gnmi/testdata/tls" "github.com/go-redis/redis" @@ -516,6 +517,20 @@ func createQueryOrFail(t *testing.T, subListMode pb.SubscriptionList_Mode, targe return *q } +// create query for subscribing to events. +func createEventsQuery(t *testing.T, paths ...string) client.Query { + return createQueryOrFail(t, + pb.SubscriptionList_STREAM, + "EVENTS", + []subscriptionQuery{ + { + Query: paths, + SubMode: pb.SubscriptionMode_ON_CHANGE, + }, + }, + false) +} + // createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode. func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query { return createQueryOrFail(t, @@ -2587,6 +2602,120 @@ func TestAuthCapabilities(t *testing.T) { } +func TestClient(t *testing.T) { + events := [] sdc.Evt_rcvd { + { "test0", 7, 777 }, + { "test1", 6, 677 }, + { "test2", 5, 577 }, + { "test3", 4, 477 }, + } + + HEARTBEAT_SET := 5 + heartbeat := 0 + event_index := 0 + rcv_timeout := sdc.SUBSCRIBER_TIMEOUT + deinit_done := false + + mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func() unsafe.Pointer { + return nil + }) + defer mock1.Reset() + + mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { + rc := (int)(0) + var evt sdc.Evt_rcvd + + if event_index < len(events) { + evt = events[event_index] + event_index++ + } else { + time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) + rc = -1 + } + return rc, evt + }) + defer mock2.Reset() + + mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { + heartbeat = val + }) + + defer mock3.Reset() + + mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { + deinit_done = true + }) + + defer mock4.Reset() + + s := createServer(t, 8081) + go runServer(t, s) + + qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET) + q := createEventsQuery(t, qstr) + // q := createEventsQuery(t, "all") + q.Addrs = []string{"127.0.0.1:8081"} + + tests := []struct { + desc string + pub_data []string + wantErr bool + wantNoti []client.Notification + pause int + poll int + } { + { + desc: "base client create", + poll: 3, + }, + } + + sdc.C_init_subs() + + for _, tt := range tests { + heartbeat = 0 + deinit_done = false + t.Run(tt.desc, func(t *testing.T) { + c := client.New() + defer c.Close() + + var gotNoti []string + q.NotificationHandler = func(n client.Notification) error { + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + str := fmt.Sprintf("%v", nn.Val) + gotNoti = append(gotNoti, str) + } + return nil + } + + go func() { + c.Subscribe(context.Background(), q) + }() + + // wait for half second for subscribeRequest to sync + time.Sleep(time.Millisecond * 2000) + + if len(events) != len(gotNoti) { + t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)) + } + + if (heartbeat != HEARTBEAT_SET) { + t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) + } + fmt.Printf("DONE: events:%d gotNoti=%d\n", len(events), len(gotNoti)) + }) + time.Sleep(time.Millisecond * 1000) + + if (deinit_done == false) { + t.Errorf("Events client deinit *NOT* called.") + } + // t.Log("END of a TEST") + } + + s.s.Stop() +} + func init() { // Enable logs at UT setup flag.Lookup("v").Value.Set("10") diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index dfc742bb..a7e5a862 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -51,6 +51,12 @@ type Client interface { // Close provides implemenation for explicit cleanup of Client Close() error + + // callbacks on send failed + FailedSend() + + // callback on sent + SentOne(*Value) } type Stream interface { @@ -271,7 +277,6 @@ func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.W SyncResponse: false, Val: val, } - c.q.Put(Value{spbv}) log.V(6).Infof("Added spbv #%v", spbv) } @@ -488,7 +493,7 @@ func populateDbtablePath(prefix, path *gnmipb.Path, pathG2S *map[*gnmipb.Path][] } if targetDbName == "COUNTERS_DB" { - err := initCountersPortNameMap() + err := initCountersPortNameMap() if err != nil { return err } @@ -1230,6 +1235,12 @@ func (c *DbClient) Capabilities() []gnmipb.ModelData { return nil } +func (c *DbClient) SentOne(val *Value) { +} + +func (c *DbClient) FailedSend() { +} + // validateSampleInterval validates the sampling interval of the given subscription. func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) { requestedInterval := time.Duration(sub.GetSampleInterval()) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go new file mode 100644 index 00000000..3c6533a4 --- /dev/null +++ b/sonic_data_client/events_client.go @@ -0,0 +1,386 @@ +package client + +/* +#cgo CFLAGS: -g -Wall -I../../sonic-swss-common/common -Wformat -Werror=format-security -fPIE +#cgo LDFLAGS: -L/usr/lib -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon +#include +#include +#include +#include "events_wrap.h" +*/ +import "C" + +import ( + "strconv" + "fmt" + "reflect" + "sync" + "time" + "unsafe" + + "github.com/go-redis/redis" + + spb "github.com/sonic-net/sonic-gnmi/proto" + sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds +const EVENT_BUFFSZ = 4096 + +const LATENCY_LIST_SIZE = 10 // Size of list of latencies. +const PQ_MAX_SIZE = 10240 // Max cnt of pending events in PQ. + +// STATS counters +const MISSED = "COUNTERS_EVENTS:missed_internal" +const DROPPED = "COUNTERS_EVENTS:missed_by_slow_receiver" +const LATENCY = "COUNTERS_EVENTS:latency_in_ms" + +var STATS_CUMULATIVE_KEYS = [...]string {MISSED, DROPPED} +var STATS_ABSOLUTE_KEYS = [...]string {LATENCY} + +const STATS_FIELD_NAME = "value" + +const EVENTD_PUBLISHER_SOURCE = "{\"sonic-events-eventd" + +// Path parameter +const PARAM_HEARTBEAT = "heartbeat" + +type EventClient struct { + + prefix *gnmipb.Path + path *gnmipb.Path + + q *queue.PriorityQueue + channel chan struct{} + + wg *sync.WaitGroup // wait for all sub go routines to finish + + subs_handle unsafe.Pointer + + stopped int + + // Stats counter + counters map[string]uint64 + + last_latencies [LATENCY_LIST_SIZE]uint64 + last_latency_index int + last_latency_full bool + + last_errors uint64 +} + +func Set_heartbeat(val int) { + s := fmt.Sprintf("{\"HEARTBEAT_INTERVAL\":%d}", val) + rc := C.event_set_global_options(C.CString(s)); + if rc != 0 { + log.V(4).Infof("Failed to set heartbeat val=%d rc=%d", val, rc) + } +} + +func C_init_subs() unsafe.Pointer { + return C.events_init_subscriber_wrap(true, C.int(SUBSCRIBER_TIMEOUT)) +} + +func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) { + var evtc EventClient + evtc.prefix = prefix + for _, path := range paths { + // Only one path is expected. Take the last if many + evtc.path = path + } + + for _, e := range evtc.path.GetElem() { + keys := e.GetKey() + for k, v := range keys { + if (k == PARAM_HEARTBEAT) { + if val, err := strconv.Atoi(v); err == nil { + log.V(7).Infof("evtc.heartbeat_interval is set to %d", val) + Set_heartbeat(val) + } + break + } + } + } + + C.swssSetLogPriority(C.int(logLevel)) + + /* Init subscriber with cache use and defined time out */ + evtc.subs_handle = C_init_subs() + evtc.stopped = 0 + + /* Init list & counters */ + evtc.counters = make(map[string]uint64) + + for _, key := range STATS_CUMULATIVE_KEYS { + evtc.counters[key] = 0 + } + + for _, key := range STATS_ABSOLUTE_KEYS { + evtc.counters[key] = 0 + } + + for i := 0; i < len(evtc.last_latencies); i++ { + evtc.last_latencies[i] = 0 + } + evtc.last_latency_index = 0 + evtc.last_errors = 0 + evtc.last_latency_full = false + + log.V(7).Infof("NewEventClient constructed. logLevel=%d", logLevel) + + return &evtc, nil +} + + +func compute_latency(evtc *EventClient) { + if evtc.last_latency_full { + var total uint64 = 0 + + for _, v := range evtc.last_latencies { + if v > 0 { + total += v + } + } + evtc.counters[LATENCY] = (uint64) (total/LATENCY_LIST_SIZE/1000/1000) + } +} + +func update_stats(evtc *EventClient) { + /* Wait for any update */ + db_counters := make(map[string]uint64) + var wr_counters *map[string]uint64 = nil + var rclient *redis.Client + + /* + * This loop pauses until at least one non zero counter. + * This helps add some initial pause before accessing DB + * for existing values. + */ + for evtc.stopped == 0 { + var val uint64 + + compute_latency(evtc) + for _, val = range evtc.counters { + if val != 0 { + break + } + } + if val != 0 { + break + } + time.Sleep(time.Second) + } + + /* Populate counters from DB for cumulative counters. */ + if evtc.stopped == 0 { + ns := sdcfg.GetDbDefaultNamespace() + + rclient = redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: sdcfg.GetDbTcpAddr("COUNTERS_DB", ns), + Password: "", // no password set, + DB: sdcfg.GetDbId("COUNTERS_DB", ns), + DialTimeout:0, + }) + + + // Init current values for cumulative keys and clear for absolute + for _, key := range STATS_CUMULATIVE_KEYS { + fv, err := rclient.HGetAll(key).Result() + if err != nil { + number, errC := strconv.ParseUint(fv[STATS_FIELD_NAME], 10, 64) + if errC == nil { + db_counters[key] = number + } + } + } + for _, key := range STATS_ABSOLUTE_KEYS { + db_counters[key] = 0 + } + } + + /* Main running loop that updates DB */ + for evtc.stopped == 0 { + tmp_counters := make(map[string]uint64) + + // compute latency + compute_latency(evtc) + + for key, val := range evtc.counters { + tmp_counters[key] = val + db_counters[key] + } + tmp_counters[DROPPED] += evtc.last_errors + + if (wr_counters == nil) || !reflect.DeepEqual(tmp_counters, *wr_counters) { + for key, val := range tmp_counters { + sval := strconv.FormatUint(val, 10) + ret, err := rclient.HSet(key, STATS_FIELD_NAME, sval).Result() + if !ret { + log.V(3).Infof("EventClient failed to update COUNTERS key:%s val:%v err:%v", + key, sval, err) + } + } + wr_counters = &tmp_counters + } + time.Sleep(time.Second) + } +} + + +// String returns the target the client is querying. +func (evtc *EventClient) String() string { + return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget()) +} + +var evt_ptr *C.event_receive_op_C_t + +type Evt_rcvd struct { + Event_str string + Missed_cnt uint32 + Publish_epoch_ms int64 +} + +func C_recv_evt(h unsafe.Pointer) (int, Evt_rcvd) { + var evt Evt_rcvd + + rc := (int)(C.event_receive_wrap(h, evt_ptr)) + evt.Event_str = C.GoString((*C.char)(evt_ptr.event_str)) + evt.Missed_cnt = (uint32)(evt_ptr.missed_cnt) + evt.Publish_epoch_ms = (int64)(evt_ptr.publish_epoch_ms) + + return rc, evt +} + +func C_deinit_subs(h unsafe.Pointer) { + C.events_deinit_subscriber_wrap(h) +} + +func get_events(evtc *EventClient) { + str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) + defer C.free(unsafe.Pointer(str_ptr)) + + evt_ptr = (*C.event_receive_op_C_t)(C.malloc(C.size_t(unsafe.Sizeof(C.event_receive_op_C_t{})))) + defer C.free(unsafe.Pointer(evt_ptr)) + + evt_ptr.event_str = (*C.char)(str_ptr) + evt_ptr.event_sz = C.uint32_t(EVENT_BUFFSZ) + + for { + + rc, evt := C_recv_evt(evtc.subs_handle) + log.V(7).Infof("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(str_ptr)) + + if rc == 0 { + evtc.counters[MISSED] += (uint64)(evt.Missed_cnt) + qlen := evtc.q.Len() + + if (qlen < PQ_MAX_SIZE) { + evtTv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_StringVal { + StringVal: evt.Event_str, + }} + if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil { + return + } + } else { + evtc.counters[DROPPED] += 1 + } + } + if evtc.stopped == 1 { + log.V(1).Infof("%v stop channel closed, exiting get_events routine", evtc) + C_deinit_subs(evtc.subs_handle) + evtc.subs_handle = nil + return + } + // TODO: Record missed count in stats table. + // intVar, err := strconv.Atoi(C.GoString((*C.char)(c_mptr))) + } +} + + +func send_event(evtc *EventClient, tv *gnmipb.TypedValue, + timestamp int64) error { + spbv := &spb.Value{ + Prefix: evtc.prefix, + Path: evtc.path, + Timestamp: timestamp, + Val: tv, + } + + if err := evtc.q.Put(Value{spbv}); err != nil { + log.V(3).Infof("Queue error: %v", err) + return err + } + return nil +} + +func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + + evtc.wg = wg + defer evtc.wg.Done() + + evtc.q = q + evtc.channel = stop + + go get_events(evtc) + go update_stats(evtc) + + for { + select { + case <-evtc.channel: + evtc.stopped = 1 + log.V(3).Infof("Channel closed by client") + return + } + } +} + + +func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { + return nil, nil +} + +func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + +func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + + +func (evtc *EventClient) Close() error { + return nil +} + +func (evtc *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { + return nil +} +func (evtc *EventClient) Capabilities() []gnmipb.ModelData { + return nil +} + +func (c *EventClient) SentOne(val *Value) { + var udiff uint64 + + diff := time.Now().UnixNano() - val.GetTimestamp() + udiff = (uint64)(diff) + + c.last_latencies[c.last_latency_index] = udiff + c.last_latency_index += 1 + if c.last_latency_index >= len(c.last_latencies) { + c.last_latency_index = 0 + c.last_latency_full = true + } +} + +func (c *EventClient) FailedSend() { + c.last_errors += 1 +} + + +// cgo LDFLAGS: -L/sonic/target/files/bullseye -lxswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lxxeventxx -Wl,-rpath,/sonic/target/files/bullseye + diff --git a/sonic_data_client/non_db_client.go b/sonic_data_client/non_db_client.go index c6906c6d..05bb3682 100644 --- a/sonic_data_client/non_db_client.go +++ b/sonic_data_client/non_db_client.go @@ -587,3 +587,9 @@ func (c *NonDbClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda func (c *NonDbClient) Capabilities() []gnmipb.ModelData { return nil } +func (c *NonDbClient) SentOne(val *Value) { +} + +func (c *NonDbClient) FailedSend() { +} + diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index bee41d04..82b8bf7e 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -520,6 +520,12 @@ func (c *TranslClient) Capabilities() []gnmipb.ModelData { func (c *TranslClient) Close() error { return nil } +func (c *TranslClient) SentOne(val *Value) { +} + +func (c *TranslClient) FailedSend() { +} + func getBundleVersion(extensions []*gnmi_extpb.Extension) *string { for _,e := range extensions { diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index b7a927f9..1e9427e9 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "flag" "io/ioutil" + "strconv" "time" log "github.com/golang/glog" @@ -16,7 +17,7 @@ import ( ) var ( - userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} + userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} port = flag.Int("port", -1, "port to listen on") // Certificate files. caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") @@ -41,12 +42,12 @@ func main() { defUserAuth = gnmi.AuthTypes{"jwt": false, "password": false, "cert": false} } - if isFlagPassed("client_auth") { - log.V(1).Infof("client_auth provided") - }else { - log.V(1).Infof("client_auth not provided, using defaults.") - userAuth = defUserAuth - } + if isFlagPassed("client_auth") { + log.V(1).Infof("client_auth provided") + }else { + log.V(1).Infof("client_auth not provided, using defaults.") + userAuth = defUserAuth + } switch { case *port <= 0: @@ -58,8 +59,14 @@ func main() { cfg := &gnmi.Config{} cfg.Port = int64(*port) + cfg.LogLevel = 3 var opts []grpc.ServerOption + if val, err := strconv.Atoi(getflag("v")); err == nil { + cfg.LogLevel = val + log.Errorf("flag: log level %v", cfg.LogLevel) + } + if !*noTLS { var certificate tls.Certificate var err error @@ -69,13 +76,13 @@ func main() { log.Exitf("could not load server key pair: %s", err) } } else { - switch { - case *serverCert == "": - log.Errorf("serverCert must be set.") - return - case *serverKey == "": - log.Errorf("serverKey must be set.") - return + switch { + case *serverCert == "": + log.Errorf("serverCert must be set.") + return + case *serverKey == "": + log.Errorf("serverKey must be set.") + return } certificate, err = tls.LoadX509KeyPair(*serverCert, *serverKey) if err != nil { @@ -144,11 +151,21 @@ func main() { } func isFlagPassed(name string) bool { - found := false - flag.Visit(func(f *flag.Flag) { - if f.Name == name { - found = true - } - }) - return found + found := false + flag.Visit(func(f *flag.Flag) { + if f.Name == name { + found = true + } + }) + return found +} + +func getflag(name string) string { + val := "" + flag.VisitAll(func(f *flag.Flag) { + if f.Name == name { + val = f.Value.String() + } + }) + return val }