Skip to content

Commit

Permalink
Using enableKafka variable to control kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
thakurajayL committed Jun 16, 2024
1 parent 692da71 commit 44d178b
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 22 deletions.
3 changes: 2 additions & 1 deletion context/amf_ran.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"strings"

"github.com/omec-project/amf/factory"
"github.com/omec-project/amf/logger"
"github.com/omec-project/amf/metrics"
"github.com/omec-project/amf/protos/sdcoreAmfServer"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (ran *AmfRan) Remove() {
NfStatus: mi.NfStatusDisconnected, NfName: ran.GnbId,
},
}
if metrics.Status() {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil {
ran.Log.Errorf("Could not publish NfStatusEvent: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion context/amf_ue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/omec-project/amf/factory"
"github.com/omec-project/amf/logger"
"github.com/omec-project/amf/metrics"
"github.com/omec-project/amf/protos/sdcoreAmfServer"
Expand Down Expand Up @@ -1064,9 +1065,10 @@ func getPublishUeCtxtInfoOp(state fsm.StateType) mi.SubscriberOp {

// Collect Ctxt info and publish on Kafka stream
func (ueContext *AmfUe) PublishUeCtxtInfo() {
if !metrics.Status() {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
return
}

op := getPublishUeCtxtInfoOp(ueContext.State[models.AccessType__3_GPP_ACCESS].Current())
kafkaSmCtxt := mi.CoreSubscriber{}

Expand Down
7 changes: 4 additions & 3 deletions factory/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type Mongodb struct {
}

type KafkaInfo struct {
BrokerUri string `yaml:"brokerUri,omitempty"`
BrokerPort int `yaml:"brokerPort,omitempty"`
Topic string `yaml:"topicName,omitempty"`
EnableKafka *bool `yaml:"enableKafka,omitempty"`
BrokerUri string `yaml:"brokerUri,omitempty"`
BrokerPort int `yaml:"brokerPort,omitempty"`
Topic string `yaml:"topicName,omitempty"`
}

type Configuration struct {
Expand Down
4 changes: 4 additions & 0 deletions factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func InitConfigFactory(f string) error {
if AmfConfig.Configuration.WebuiUri == "" {
AmfConfig.Configuration.WebuiUri = "webui:9876"
}
if AmfConfig.Configuration.KafkaInfo.EnableKafka == nil {
enableKafka := true
AmfConfig.Configuration.KafkaInfo.EnableKafka = &enableKafka
}
}

return nil
Expand Down
18 changes: 5 additions & 13 deletions metrics/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@ type Writer struct {
kafkaWriter *kafka.Writer
}

var (
StatWriter Writer
Enabled bool = true
)
var StatWriter Writer

func InitialiseKafkaStream(config *factory.Configuration) error {
brokerUrl := ""
topicName := "sdcore-data-source-amf"

if config.KafkaInfo.BrokerUri == "" {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
logger.KafkaLog.Info("Kafka disabled")
Enabled = false
return nil
}

brokerUrl := "sd-core-kafka-headless:9092"
topicName := "sdcore-data-source-amf"

if config.KafkaInfo.BrokerUri != "" && config.KafkaInfo.BrokerPort != 0 {
brokerUrl = fmt.Sprintf("%s:%d", config.KafkaInfo.BrokerUri, config.KafkaInfo.BrokerPort)
}
Expand Down Expand Up @@ -64,10 +60,6 @@ func GetWriter() Writer {
return StatWriter
}

func Status() bool {
return Enabled
}

func (writer Writer) SendMessage(message []byte) error {
msg := kafka.Message{Value: message}
err := writer.kafkaWriter.WriteMessages(context.Background(), msg)
Expand Down
3 changes: 2 additions & 1 deletion ngap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/omec-project/amf/consumer"
"github.com/omec-project/amf/context"
"github.com/omec-project/amf/factory"
gmm_message "github.com/omec-project/amf/gmm/message"
"github.com/omec-project/amf/logger"
"github.com/omec-project/amf/metrics"
Expand Down Expand Up @@ -642,7 +643,7 @@ func HandleNGSetupRequest(ran *context.AmfRan, message *ngapType.NGAPPDU) {
NfStatus: mi.NfStatusConnected, NfName: ran.GnbId,
},
}
if metrics.Status() {
if *factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil {
ran.Log.Errorf("Could not publish NfStatusEvent: %v", err)
}
Expand Down
7 changes: 4 additions & 3 deletions service/amf_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

"github.com/omec-project/amf/context"
"github.com/omec-project/amf/factory"
"github.com/omec-project/amf/metrics"
"github.com/omec-project/amf/ngap"
"github.com/omec-project/amf/protos/sdcoreAmfServer"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ
},
}

if metrics.Status() {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil {
log.Printf("Error publishing NfStatusEvent: %v", err)
}
Expand All @@ -91,7 +92,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ
NfStatus: mi.NfStatusDisconnected, NfName: req.GnbId,
},
}
if metrics.Status() {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil {
log.Printf("Error publishing NfStatusEvent: %v", err)
}
Expand All @@ -106,7 +107,7 @@ func (s *Server) HandleMessage(srv sdcoreAmfServer.NgapService_HandleMessageServ
NfStatus: mi.NfStatusConnected, NfName: req.GnbId,
},
}
if metrics.Status() {
if !*factory.AmfConfig.Configuration.KafkaInfo.EnableKafka {
if err := metrics.StatWriter.PublishNfStatusEvent(gnbStatus); err != nil {
log.Printf("Error publishing NfStatusEvent: %v", err)
}
Expand Down

0 comments on commit 44d178b

Please sign in to comment.