From 692da71ad5c1eb787b41d7f6e0f6e90ec67112d4 Mon Sep 17 00:00:00 2001 From: Ajay Lotan Thakur Date: Sat, 15 Jun 2024 18:26:06 -0400 Subject: [PATCH] Support to disable Kafka (#263) --- context/amf_ran.go | 6 ++++-- context/amf_ue.go | 3 +++ metrics/kafka.go | 17 +++++++++++++++-- ngap/handler.go | 6 ++++-- service/amf_server.go | 19 +++++++++++++------ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/context/amf_ran.go b/context/amf_ran.go index d52f10d3..3c34e072 100644 --- a/context/amf_ran.go +++ b/context/amf_ran.go @@ -73,8 +73,10 @@ func (ran *AmfRan) Remove() { NfStatus: mi.NfStatusDisconnected, NfName: ran.GnbId, }, } - if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { - ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) + if metrics.Status() { + if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { + ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) + } } ran.SetRanStats(RanDisconnected) diff --git a/context/amf_ue.go b/context/amf_ue.go index 7f6197f4..3ba0babc 100644 --- a/context/amf_ue.go +++ b/context/amf_ue.go @@ -1064,6 +1064,9 @@ func getPublishUeCtxtInfoOp(state fsm.StateType) mi.SubscriberOp { // Collect Ctxt info and publish on Kafka stream func (ueContext *AmfUe) PublishUeCtxtInfo() { + if !metrics.Status() { + return + } op := getPublishUeCtxtInfoOp(ueContext.State[models.AccessType__3_GPP_ACCESS].Current()) kafkaSmCtxt := mi.CoreSubscriber{} diff --git a/metrics/kafka.go b/metrics/kafka.go index 88ab8b49..5f60e1d1 100644 --- a/metrics/kafka.go +++ b/metrics/kafka.go @@ -20,12 +20,21 @@ type Writer struct { kafkaWriter *kafka.Writer } -var StatWriter Writer +var ( + StatWriter Writer + Enabled bool = true +) func InitialiseKafkaStream(config *factory.Configuration) error { - brokerUrl := "sd-core-kafka-headless:9092" + brokerUrl := "" topicName := "sdcore-data-source-amf" + if config.KafkaInfo.BrokerUri == "" { + logger.KafkaLog.Info("Kafka disabled") + Enabled = false + return nil + } + if config.KafkaInfo.BrokerUri != "" && config.KafkaInfo.BrokerPort != 0 { brokerUrl = fmt.Sprintf("%s:%d", config.KafkaInfo.BrokerUri, config.KafkaInfo.BrokerPort) } @@ -55,6 +64,10 @@ func GetWriter() Writer { return StatWriter } +func Status() bool { + return Enabled +} + func (writer Writer) SendMessage(message []byte) error { msg := kafka.Message{Value: message} err := writer.kafkaWriter.WriteMessages(context.Background(), msg) diff --git a/ngap/handler.go b/ngap/handler.go index 783bb74f..d732ebc1 100644 --- a/ngap/handler.go +++ b/ngap/handler.go @@ -642,8 +642,10 @@ func HandleNGSetupRequest(ran *context.AmfRan, message *ngapType.NGAPPDU) { NfStatus: mi.NfStatusConnected, NfName: ran.GnbId, }, } - if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { - ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) + if metrics.Status() { + if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { + ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) + } } } else { ngap_message.SendNGSetupFailure(ran, cause) diff --git a/service/amf_server.go b/service/amf_server.go index 11084619..b9edc0bf 100644 --- a/service/amf_server.go +++ b/service/amf_server.go @@ -68,8 +68,11 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ NfStatus: mi.NfStatusConnected, NfName: req.GnbId, }, } - if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { - log.Printf("Error publishing NfStatusEvent: %v", err) + + if metrics.Status() { + if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { + log.Printf("Error publishing NfStatusEvent: %v", err) + } } } } @@ -88,8 +91,10 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ NfStatus: mi.NfStatusDisconnected, NfName: req.GnbId, }, } - if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { - log.Printf("Error publishing NfStatusEvent: %v", err) + if metrics.Status() { + if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { + log.Printf("Error publishing NfStatusEvent: %v", err) + } } } else if req.Msgtype == sdcoreAmfServer.MsgType_GNB_CONN { log.Println("New GNB Connected ") @@ -101,8 +106,10 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ NfStatus: mi.NfStatusConnected, NfName: req.GnbId, }, } - if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { - log.Printf("Error publishing NfStatusEvent: %v", err) + if metrics.Status() { + if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { + log.Printf("Error publishing NfStatusEvent: %v", err) + } } } else { ngap.DispatchLb(req, Amf2RanMsgChan)