Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io): restore edgex source/sink #2998

Merged
merged 3 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions etc/sources/edgex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,4 @@ share_conf: #Conf_key
server: 127.0.0.1
port: 1883
topic: events
type: mqtt
connectionSelector: edgex.redisMsgBus
connectionSelector: redisMsgBus
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.1.0
github.com/edgexfoundry/go-mod-messaging/v3 v3.1.0
github.com/fxamacker/cbor/v2 v2.6.0
github.com/gdexlab/go-render v1.0.1
github.com/go-sql-driver/mysql v1.8.1
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/protobuf v1.5.4
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba
github.com/google/uuid v1.6.0
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/jhump/protoreflect v1.15.6
github.com/jinzhu/now v1.1.5
github.com/keepeye/logrus-filename v0.0.0-20190711075016-ce01a4391dd1
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand All @@ -89,7 +89,7 @@ require (
github.com/go-playground/validator/v10 v10.19.0 // indirect
github.com/go-redis/redis/v7 v7.4.1 // indirect
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -127,6 +127,7 @@ require (
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
Expand Down
29 changes: 29 additions & 0 deletions internal/binder/io/ext_edgex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build edgex || full

package io

import (
"github.com/lf-edge/ekuiper/v2/internal/io/edgex"
edgexCon "github.com/lf-edge/ekuiper/v2/internal/io/edgex/client"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

func init() {
modules.RegisterConnection("edgex", edgexCon.GetConnection)
modules.RegisterSource("edgex", edgex.GetSource)
modules.RegisterSink("edgex", edgex.GetSink)
}
189 changes: 189 additions & 0 deletions internal/io/edgex/client/edgex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"fmt"
"strings"

"github.com/edgexfoundry/go-mod-messaging/v3/messaging"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

type Client struct {
mbconf types.MessageBusConfig
client messaging.MessageClient
}

func (es *Client) Ping(_ api.StreamContext) error {
if es.client != nil {
return nil

Check warning on line 37 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}
return fmt.Errorf("client is nil")

Check warning on line 39 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L39

Added line #L39 was not covered by tests
}

func (es *Client) DetachSub(ctx api.StreamContext, props map[string]any) {
topic, ok := props["topic"]
ctx.GetLogger().Infof("detach edgex sub %v", topic)
if ok {
err := es.client.Unsubscribe(topic.(string))
if err != nil {
ctx.GetLogger().Error(err)

Check warning on line 48 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L42-L48

Added lines #L42 - L48 were not covered by tests
}
}
}

func (es *Client) Close(ctx api.StreamContext) error {
if es.client != nil {
return es.client.Disconnect()

Check warning on line 55 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L53-L55

Added lines #L53 - L55 were not covered by tests
}
return nil

Check warning on line 57 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L57

Added line #L57 was not covered by tests
}

var _ modules.Connection = &Client{}

func GetConnection(ctx api.StreamContext, props map[string]any) (modules.Connection, error) {
ctx.GetLogger().Infof("connect to edgex")
c := &Client{}
err := c.CfgValidate(props)
if err != nil {
return nil, err

Check warning on line 67 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L62-L67

Added lines #L62 - L67 were not covered by tests
}
client, err := messaging.NewMessageClient(c.mbconf)
if err != nil {
return nil, err

Check warning on line 71 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L69-L71

Added lines #L69 - L71 were not covered by tests
}

if err := client.Connect(); err != nil {
conf.Log.Errorf("The connection to edgex messagebus failed.")
return nil, fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())

Check warning on line 76 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L74-L76

Added lines #L74 - L76 were not covered by tests
}
conf.Log.Infof("The connection to edgex messagebus is established successfully.")

Check warning on line 78 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L78

Added line #L78 was not covered by tests

c.client = client
return c, nil

Check warning on line 81 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}

type EdgexConf struct {
Protocol string `json:"protocol"`
Server string `json:"server"`
Host string `json:"host"`
Port int `json:"port"`
Type string `json:"type"`
Optional map[string]string `json:"optional"`
}

// Modify the copied conf to print no password.
func printConf(mbconf types.MessageBusConfig) {
printableOptional := make(map[string]string)
for k, v := range mbconf.Optional {
if strings.EqualFold(k, "password") {
printableOptional[k] = "*"

Check warning on line 98 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L98

Added line #L98 was not covered by tests
} else {
printableOptional[k] = v
}
}
mbconf.Optional = printableOptional
conf.Log.Infof("Use configuration for edgex messagebus %v", mbconf)
}

func (es *Client) CfgValidate(props map[string]interface{}) error {
edgeAddr := "localhost"
c := &EdgexConf{
Protocol: "redis",
Port: 6379,
Type: messaging.Redis,
Optional: nil,
}

if o, ok := props["optional"]; ok {
switch ot := o.(type) {
case map[string]string:
c.Optional = ot
case map[string]interface{}:
c.Optional = make(map[string]string)
for k, v := range ot {
c.Optional[k] = fmt.Sprintf("%v", v)
}
default:
return fmt.Errorf("invalid optional config %v, must be a map", o)

Check warning on line 126 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}
delete(props, "optional")
}

err := cast.MapToStruct(props, c)
if err != nil {
return fmt.Errorf("map config map to struct fail with error: %v", err)
}

if c.Host != "" {
edgeAddr = c.Host
} else if c.Server != "" {
edgeAddr = c.Server
}

if c.Type != messaging.MQTT && c.Type != messaging.Redis &&
c.Type != messaging.NatsCore && c.Type != messaging.NatsJetStream {
return fmt.Errorf("specified wrong type value %s", c.Type)
}
if c.Port < 0 {
return fmt.Errorf("specified wrong port value, expect positive integer but got %d", c.Port)
}

mbconf := types.MessageBusConfig{
Broker: types.HostInfo{
Host: edgeAddr,
Port: c.Port,
Protocol: c.Protocol,
},
Type: c.Type,
}
mbconf.Optional = c.Optional
es.mbconf = mbconf

printConf(mbconf)

return nil
}

func (es *Client) Publish(env types.MessageEnvelope, topic string) error {
if err := es.client.Publish(env, topic); err != nil {
conf.Log.Errorf("Publish to topic %s has error : %s.", topic, err.Error())
return fmt.Errorf("Failed to publish to edgex message bus: " + err.Error())

Check warning on line 169 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L166-L169

Added lines #L166 - L169 were not covered by tests
}
return nil

Check warning on line 171 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L171

Added line #L171 was not covered by tests
}

func (es *Client) Subscribe(msg chan types.MessageEnvelope, topic string, err chan error) error {
topics := []types.TopicChannel{{Topic: topic, Messages: msg}}
if err := es.client.Subscribe(topics, err); err != nil {
conf.Log.Errorf("Failed to subscribe to edgex messagebus with topic %s has error : %s.", topic, err.Error())
return err

Check warning on line 178 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L174-L178

Added lines #L174 - L178 were not covered by tests
}
return nil

Check warning on line 180 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L180

Added line #L180 was not covered by tests
}

func (es *Client) Disconnect() error {
conf.Log.Infof("Closing the connection to edgex messagebus.")
if e := es.client.Disconnect(); e != nil {
return e

Check warning on line 186 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L183-L186

Added lines #L183 - L186 were not covered by tests
}
return nil

Check warning on line 188 in internal/io/edgex/client/edgex.go

View check run for this annotation

Codecov / codecov/patch

internal/io/edgex/client/edgex.go#L188

Added line #L188 was not covered by tests
}
Loading
Loading