diff --git a/context/amf_ran.go b/context/amf_ran.go index 3c34e072..097e03dc 100644 --- a/context/amf_ran.go +++ b/context/amf_ran.go @@ -12,6 +12,7 @@ import ( "net" "strings" + "github.com/omec-project/amf/factory" "github.com/omec-project/amf/logger" "github.com/omec-project/amf/metrics" "github.com/omec-project/amf/protos/sdcoreAmfServer" @@ -73,7 +74,7 @@ func (ran *AmfRan) Remove() { NfStatus: mi.NfStatusDisconnected, NfName: ran.GnbId, }, } - if metrics.Status() { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) } diff --git a/context/amf_ue.go b/context/amf_ue.go index 3ba0babc..c196f86e 100644 --- a/context/amf_ue.go +++ b/context/amf_ue.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/omec-project/amf/factory" "github.com/omec-project/amf/logger" "github.com/omec-project/amf/metrics" "github.com/omec-project/amf/protos/sdcoreAmfServer" @@ -1064,9 +1065,10 @@ func getPublishUeCtxtInfoOp(state fsm.StateType) mi.SubscriberOp { // Collect Ctxt info and publish on Kafka stream func (ueContext *AmfUe) PublishUeCtxtInfo() { - if !metrics.Status() { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { return } + op := getPublishUeCtxtInfoOp(ueContext.State[models.AccessType__3_GPP_ACCESS].Current()) kafkaSmCtxt := mi.CoreSubscriber{} diff --git a/factory/config.go b/factory/config.go index e267f05d..a10ae710 100644 --- a/factory/config.go +++ b/factory/config.go @@ -44,9 +44,10 @@ type Mongodb struct { } type KafkaInfo struct { - BrokerUri string `yaml:"brokerUri,omitempty"` - BrokerPort int `yaml:"brokerPort,omitempty"` - Topic string `yaml:"topicName,omitempty"` + EnableKafka *bool `yaml:"enableKafka,omitempty"` + BrokerUri string `yaml:"brokerUri,omitempty"` + BrokerPort int `yaml:"brokerPort,omitempty"` + Topic string `yaml:"topicName,omitempty"` } type Configuration struct { diff --git a/factory/factory.go b/factory/factory.go index e7ae088d..46a54707 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -34,6 +34,10 @@ func InitConfigFactory(f string) error { if AmfConfig.Configuration.WebuiUri == "" { AmfConfig.Configuration.WebuiUri = "webui:9876" } + if AmfConfig.Configuration.KafkaInfo.EnableKafka == nil { + enableKafka := true + AmfConfig.Configuration.KafkaInfo.EnableKafka = &enableKafka + } } return nil diff --git a/metrics/kafka.go b/metrics/kafka.go index 5f60e1d1..8f5ce4b6 100644 --- a/metrics/kafka.go +++ b/metrics/kafka.go @@ -20,21 +20,17 @@ type Writer struct { kafkaWriter *kafka.Writer } -var ( - StatWriter Writer - Enabled bool = true -) +var StatWriter Writer func InitialiseKafkaStream(config *factory.Configuration) error { - brokerUrl := "" - topicName := "sdcore-data-source-amf" - - if config.KafkaInfo.BrokerUri == "" { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { logger.KafkaLog.Info("Kafka disabled") - Enabled = false return nil } + brokerUrl := "sd-core-kafka-headless:9092" + topicName := "sdcore-data-source-amf" + if config.KafkaInfo.BrokerUri != "" && config.KafkaInfo.BrokerPort != 0 { brokerUrl = fmt.Sprintf("%s:%d", config.KafkaInfo.BrokerUri, config.KafkaInfo.BrokerPort) } @@ -64,10 +60,6 @@ func GetWriter() Writer { return StatWriter } -func Status() bool { - return Enabled -} - func (writer Writer) SendMessage(message []byte) error { msg := kafka.Message{Value: message} err := writer.kafkaWriter.WriteMessages(context.Background(), msg) diff --git a/ngap/handler.go b/ngap/handler.go index d732ebc1..c5b09d33 100644 --- a/ngap/handler.go +++ b/ngap/handler.go @@ -14,6 +14,7 @@ import ( "github.com/omec-project/amf/consumer" "github.com/omec-project/amf/context" + "github.com/omec-project/amf/factory" gmm_message "github.com/omec-project/amf/gmm/message" "github.com/omec-project/amf/logger" "github.com/omec-project/amf/metrics" @@ -642,7 +643,7 @@ func HandleNGSetupRequest(ran *context.AmfRan, message *ngapType.NGAPPDU) { NfStatus: mi.NfStatusConnected, NfName: ran.GnbId, }, } - if metrics.Status() { + if *factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { ran.Log.Errorf("Could not publish NfStatusEvent: %v", err) } diff --git a/service/amf_server.go b/service/amf_server.go index b9edc0bf..83af9d05 100644 --- a/service/amf_server.go +++ b/service/amf_server.go @@ -12,6 +12,7 @@ import ( "os" "github.com/omec-project/amf/context" + "github.com/omec-project/amf/factory" "github.com/omec-project/amf/metrics" "github.com/omec-project/amf/ngap" "github.com/omec-project/amf/protos/sdcoreAmfServer" @@ -69,7 +70,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ }, } - if metrics.Status() { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { log.Printf("Error publishing NfStatusEvent: %v", err) } @@ -91,7 +92,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ NfStatus: mi.NfStatusDisconnected, NfName: req.GnbId, }, } - if metrics.Status() { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { log.Printf("Error publishing NfStatusEvent: %v", err) } @@ -106,7 +107,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ NfStatus: mi.NfStatusConnected, NfName: req.GnbId, }, } - if metrics.Status() { + if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka { if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil { log.Printf("Error publishing NfStatusEvent: %v", err) }