From 1de1604a6664e7002347f5109b7874fec7a795df Mon Sep 17 00:00:00 2001 From: qclc <39878907+qclc@users.noreply.github.com> Date: Tue, 21 Sep 2021 21:47:55 +0800 Subject: [PATCH] Update deviceController to reconcile the deviceService between the openyurt and edge --- api/v1alpha1/deviceservice_types.go | 45 ++- api/v1alpha1/zz_generated.deepcopy.go | 10 +- clients/edgex-foundry/deviceprofile_client.go | 5 - clients/edgex-foundry/deviceservice_client.go | 248 ++++++++++++++++ clients/edgex-foundry/util.go | 138 +++++++++ clients/errors.go | 7 +- .../device.openyurt.io_deviceservices.yaml | 242 ++++++++++------ controllers/deviceservice_controller.go | 265 ++++++++++++----- controllers/deviceservice_syncer.go | 270 +++++++++++------- go.mod | 14 +- main.go | 6 +- 11 files changed, 965 insertions(+), 285 deletions(-) create mode 100644 clients/edgex-foundry/deviceservice_client.go create mode 100644 clients/edgex-foundry/util.go diff --git a/api/v1alpha1/deviceservice_types.go b/api/v1alpha1/deviceservice_types.go index ad0cb6e..5739276 100644 --- a/api/v1alpha1/deviceservice_types.go +++ b/api/v1alpha1/deviceservice_types.go @@ -18,6 +18,15 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4" +) + +const ( + DeviceServiceFinalizer = "v1alpha1.deviceService.finalizer" + // DeviceServiceSyncedCondition indicates that the deviceService exists in both OpenYurt and edge platform + DeviceServiceSyncedCondition clusterv1.ConditionType = "DeviceServiceSynced" + // DeviceServiceManagingCondition indicates that the deviceService is being managed by cloud and its field are being reconciled + DeviceServiceManagingCondition clusterv1.ConditionType = "DeviceServiceManaging" ) type Addressable struct { @@ -47,15 +56,8 @@ type Addressable struct { // DeviceServiceSpec defines the desired state of DeviceService type DeviceServiceSpec struct { + // Information describing the device Description string `json:"description,omitempty"` - // the Id assigned by the EdgeX foundry - // TODO store this field in the status - Id string `json:"id,omitempty"` - // TODO store this field in the status - LastConnected int64 `json:"lastConnected,omitempty"` - // time in milliseconds that the device last reported data to the core - // TODO store this field in the status - LastReported int64 `json:"lastReported,omitempty"` // operational state - either enabled or disabled OperatingState OperatingState `json:"operatingState,omitempty"` // tags or other labels applied to the device service for search or other @@ -66,11 +68,28 @@ type DeviceServiceSpec struct { Addressable Addressable `json:"addressable,omitempty"` // Device Service Admin State AdminState AdminState `json:"adminState,omitempty"` + // True means deviceService is managed by cloud, cloud can update the related fields + // False means cloud can't update the fields + Managed bool `json:"managed,omitempty"` + // NodePool indicates which nodePool the deviceService comes from + NodePool string `json:"nodePool,omitempty"` } // DeviceServiceStatus defines the observed state of DeviceService type DeviceServiceStatus struct { - AddedToEdgeX bool `json:"addedToEdgeX,omitempty"` + // Synced indicates whether the device already exists on both OpenYurt and edge platform + Synced bool `json:"synced,omitempty"` + // the Id assigned by the edge platform + EdgeId string `json:"edgeId,omitempty"` + // time in milliseconds that the device last reported data to the core + LastConnected int64 `json:"lastConnected,omitempty"` + // time in milliseconds that the device last reported data to the core + LastReported int64 `json:"lastReported,omitempty"` + // Device Service Admin State + AdminState AdminState `json:"adminState,omitempty"` + // current deviceService state + // +optional + Conditions clusterv1.Conditions `json:"conditions,omitempty"` } //+kubebuilder:object:root=true @@ -85,6 +104,14 @@ type DeviceService struct { Status DeviceServiceStatus `json:"status,omitempty"` } +func (ds *DeviceService) SetConditions(conditions clusterv1.Conditions) { + ds.Status.Conditions = conditions +} + +func (ds *DeviceService) GetConditions() clusterv1.Conditions { + return ds.Status.Conditions +} + //+kubebuilder:object:root=true // DeviceServiceList contains a list of DeviceService diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 1bdd71c..6868672 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cluster-api/api/v1alpha4" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -311,7 +312,7 @@ func (in *DeviceService) DeepCopyInto(out *DeviceService) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeviceService. @@ -388,6 +389,13 @@ func (in *DeviceServiceSpec) DeepCopy() *DeviceServiceSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeviceServiceStatus) DeepCopyInto(out *DeviceServiceStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make(v1alpha4.Conditions, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeviceServiceStatus. diff --git a/clients/edgex-foundry/deviceprofile_client.go b/clients/edgex-foundry/deviceprofile_client.go index d04bd62..61bf74e 100644 --- a/clients/edgex-foundry/deviceprofile_client.go +++ b/clients/edgex-foundry/deviceprofile_client.go @@ -41,11 +41,6 @@ type EdgexDeviceProfile struct { logr.Logger } -const ( - DeviceProfilePath = "/api/v1/deviceprofile" - EdgeXObjectName = "device-controller/edgex-object.name" -) - func NewEdgexDeviceProfile(host string, port int, log logr.Logger) *EdgexDeviceProfile { return &EdgexDeviceProfile{ Client: resty.New(), diff --git a/clients/edgex-foundry/deviceservice_client.go b/clients/edgex-foundry/deviceservice_client.go new file mode 100644 index 0000000..c316d93 --- /dev/null +++ b/clients/edgex-foundry/deviceservice_client.go @@ -0,0 +1,248 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package edgex_foundry + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + + "github.com/edgexfoundry/go-mod-core-contracts/models" + "github.com/go-logr/logr" + "github.com/go-resty/resty/v2" + "github.com/openyurtio/device-controller/api/v1alpha1" + edgeCli "github.com/openyurtio/device-controller/clients" +) + +type EdgexDeviceServiceClient struct { + *resty.Client + CoreMetaClient ClientURL + logr.Logger +} + +func NewEdgexDeviceServiceClient(coreMetaClient ClientURL, log logr.Logger) *EdgexDeviceServiceClient { + return &EdgexDeviceServiceClient{ + Client: resty.New(), + CoreMetaClient: coreMetaClient, + Logger: log, + } +} + +// Create function sends a POST request to EdgeX to add a new deviceService +func (eds *EdgexDeviceServiceClient) Create(ctx context.Context, deviceservice *v1alpha1.DeviceService, options edgeCli.CreateOptions) (*v1alpha1.DeviceService, error) { + ds := toEdgexDeviceService(deviceservice) + eds.V(5).Info("will add the DeviceServices", + "DeviceService", ds.Name) + dpJson, err := json.Marshal(&ds) + if err != nil { + return nil, err + } + postPath := fmt.Sprintf("http://%s:%d%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath) + resp, err := eds.R(). + SetBody(dpJson).Post(postPath) + if err != nil { + return nil, err + } else if resp.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("create deviceService on edgex foundry failed, the response is : %s", resp.Body()) + } + createdDs := deviceservice.DeepCopy() + createdDs.Status.EdgeId = string(resp.Body()) + return createdDs, err +} + +// Delete function sends a request to EdgeX to delete a deviceService +func (eds *EdgexDeviceServiceClient) Delete(ctx context.Context, name string, option edgeCli.DeleteOptions) error { + eds.V(5).Info("will delete the DeviceService", + "DeviceService", name) + delURL := fmt.Sprintf("http://%s:%d%s/name/%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name) + resp, err := eds.R().Delete(delURL) + if err != nil { + return err + } + if string(resp.Body()) != "true" { + return errors.New(string(resp.Body())) + } + return nil +} + +// Update is used to set the admin or operating state of the deviceService by unique name of the deviceService. +// TODO support to update other fields +func (eds *EdgexDeviceServiceClient) Update(ctx context.Context, ds *v1alpha1.DeviceService, options edgeCli.UpdateOptions) (*v1alpha1.DeviceService, error) { + actualDSName := getEdgeDeviceServiceName(ds) + putBaseURL := fmt.Sprintf("http://%s:%d%s/name/%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, actualDSName) + if ds == nil { + return nil, nil + } + if ds.Spec.AdminState != "" { + amURL := fmt.Sprintf("%s/adminstate/%s", putBaseURL, ds.Spec.AdminState) + if rep, err := resty.New().R().SetHeader("Content-Type", "application/json").Put(amURL); err != nil { + return nil, err + } else if rep.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body())) + } + } + if ds.Spec.OperatingState != "" { + opURL := fmt.Sprintf("%s/opstate/%s", putBaseURL, ds.Spec.OperatingState) + if rep, err := resty.New().R(). + SetHeader("Content-Type", "application/json").Put(opURL); err != nil { + return nil, err + } else if rep.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body())) + } + } + + return ds, nil +} + +// Get is used to query the deviceService information corresponding to the deviceService name +func (eds *EdgexDeviceServiceClient) Get(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.DeviceService, error) { + eds.V(5).Info("will get DeviceServices", + "DeviceService", name) + var ds v1alpha1.DeviceService + getURL := fmt.Sprintf("http://%s:%d%s/name/%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name) + resp, err := eds.R().Get(getURL) + if err != nil { + return &ds, err + } + if string(resp.Body()) == "Item not found\n" || + strings.HasPrefix(string(resp.Body()), "no item found") { + return &ds, errors.New("Item not found") + } + var dp models.DeviceService + err = json.Unmarshal(resp.Body(), &dp) + ds = toKubeDeviceService(dp) + return &ds, err +} + +// List is used to get all deviceService objects on edge platform +// The Hanoi version currently supports only a single label and does not support other filters +func (eds *EdgexDeviceServiceClient) List(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.DeviceService, error) { + eds.V(5).Info("will list DeviceServices") + lp := fmt.Sprintf("http://%s:%d%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath) + if options.LabelSelector != nil { + if _, ok := options.LabelSelector["label"]; ok { + lp = strings.Join([]string{lp, strings.Join([]string{"label", options.LabelSelector["label"]}, "/")}, "/") + } + } + resp, err := eds.R(). + EnableTrace(). + Get(lp) + if err != nil { + return nil, err + } + dss := []models.DeviceService{} + if err := json.Unmarshal(resp.Body(), &dss); err != nil { + return nil, err + } + var res []v1alpha1.DeviceService + for _, ds := range dss { + res = append(res, toKubeDeviceService(ds)) + } + return res, nil +} + +// CreateAddressable function sends a POST request to EdgeX to add a new addressable +func (eds *EdgexDeviceServiceClient) CreateAddressable(ctx context.Context, addressable *v1alpha1.Addressable, options edgeCli.CreateOptions) (*v1alpha1.Addressable, error) { + as := toEdgeXAddressable(addressable) + eds.V(5).Info("will add the Addressables", + "Addressable", as.Name) + dpJson, err := json.Marshal(&as) + if err != nil { + return nil, err + } + postPath := fmt.Sprintf("http://%s:%d%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath) + resp, err := eds.R(). + SetBody(dpJson).Post(postPath) + if err != nil { + return nil, err + } + createdAddr := addressable.DeepCopy() + createdAddr.Id = string(resp.Body()) + return createdAddr, err +} + +// DeleteAddressable function sends a request to EdgeX to delete a addressable +func (eds *EdgexDeviceServiceClient) DeleteAddressable(ctx context.Context, name string, options edgeCli.DeleteOptions) error { + eds.V(5).Info("will delete the Addressable", + "Addressable", name) + delURL := fmt.Sprintf("http://%s:%d%s/name/%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name) + resp, err := eds.R().Delete(delURL) + if err != nil { + return err + } + if string(resp.Body()) != "true" { + return errors.New(string(resp.Body())) + } + return nil +} + +// UpdateAddressable is used to update the addressable on edgex foundry +func (eds *EdgexDeviceServiceClient) UpdateAddressable(ctx context.Context, device *v1alpha1.Addressable, options edgeCli.UpdateOptions) (*v1alpha1.Addressable, error) { + return nil, nil +} + +// GetAddressable is used to query the addressable information corresponding to the addressable name +func (eds *EdgexDeviceServiceClient) GetAddressable(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.Addressable, error) { + eds.V(5).Info("will get Addressables", + "Addressable", name) + var addressable v1alpha1.Addressable + getURL := fmt.Sprintf("http://%s:%d%s/name/%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name) + resp, err := eds.R().Get(getURL) + if err != nil { + return &addressable, err + } + if string(resp.Body()) == "Item not found\n" { + return &addressable, errors.New("Item not found") + } + var maddr models.Addressable + err = json.Unmarshal(resp.Body(), &maddr) + addressable = toKubeAddressable(maddr) + return &addressable, err +} + +// ListAddressables is used to get all addressable objects on edge platform +func (eds *EdgexDeviceServiceClient) ListAddressables(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.Addressable, error) { + eds.V(5).Info("will list Addressables") + lp := fmt.Sprintf("http://%s:%d%s", + eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath) + resp, err := eds.R(). + EnableTrace(). + Get(lp) + if err != nil { + return nil, err + } + ass := []models.Addressable{} + if err := json.Unmarshal(resp.Body(), &ass); err != nil { + return nil, err + } + var res []v1alpha1.Addressable + for i := range ass { + res = append(res, toKubeAddressable(ass[i])) + } + return res, nil +} diff --git a/clients/edgex-foundry/util.go b/clients/edgex-foundry/util.go new file mode 100644 index 0000000..ec2903b --- /dev/null +++ b/clients/edgex-foundry/util.go @@ -0,0 +1,138 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package edgex_foundry + +import ( + "strings" + + "github.com/edgexfoundry/go-mod-core-contracts/models" + devicev1alpha1 "github.com/openyurtio/device-controller/api/v1alpha1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + EdgeXObjectName = "device-controller/edgex-object.name" + DeviceServicePath = "/api/v1/deviceservice" + DeviceProfilePath = "/api/v1/deviceprofile" + AddressablePath = "/api/v1/addressable" +) + +type ClientURL struct { + Host string + Port int +} + +func getEdgeDeviceServiceName(ds *devicev1alpha1.DeviceService) string { + var actualDSName string + if _, ok := ds.ObjectMeta.Labels[EdgeXObjectName]; ok { + actualDSName = ds.ObjectMeta.Labels[EdgeXObjectName] + } else { + actualDSName = ds.GetName() + } + return actualDSName +} + +func toEdgexDeviceService(ds *devicev1alpha1.DeviceService) models.DeviceService { + return models.DeviceService{ + DescribedObject: models.DescribedObject{ + Description: ds.Spec.Description, + }, + Name: ds.GetName(), + //Id: ds.Spec.Id, + LastConnected: ds.Status.LastConnected, + LastReported: ds.Status.LastReported, + OperatingState: models.OperatingState(ds.Spec.OperatingState), + Labels: ds.Spec.Labels, + AdminState: models.AdminState(ds.Spec.AdminState), + Addressable: toEdgeXAddressable(&ds.Spec.Addressable), + } +} + +func toEdgeXAddressable(a *devicev1alpha1.Addressable) models.Addressable { + return models.Addressable{ + Id: a.Id, + Name: a.Name, + Protocol: a.Protocol, + HTTPMethod: a.HTTPMethod, + Address: a.Address, + Port: a.Port, + Path: a.Path, + Publisher: a.Publisher, + User: a.User, + Password: a.Password, + Topic: a.Topic, + } +} + +func toKubeDeviceService(ds models.DeviceService) devicev1alpha1.DeviceService { + return devicev1alpha1.DeviceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: strings.ToLower(ds.Name), + Namespace: "default", + Labels: map[string]string{ + EdgeXObjectName: ds.Name, + }, + }, + Spec: devicev1alpha1.DeviceServiceSpec{ + Description: ds.Description, + OperatingState: toKubeOperatingState(ds.OperatingState), + Labels: ds.Labels, + Addressable: toKubeAddressable(ds.Addressable), + AdminState: toKubeAdminState(ds.AdminState), + }, + Status: devicev1alpha1.DeviceServiceStatus{ + EdgeId: ds.Id, + LastConnected: ds.LastConnected, + LastReported: ds.LastReported, + AdminState: toKubeAdminState(ds.AdminState), + }, + } +} + +func toKubeAddressable(ad models.Addressable) devicev1alpha1.Addressable { + return devicev1alpha1.Addressable{ + Id: ad.Id, + Name: ad.Name, + Protocol: ad.Protocol, + HTTPMethod: ad.HTTPMethod, + Address: ad.Address, + Port: ad.Port, + Path: ad.Path, + Publisher: ad.Publisher, + User: ad.User, + Password: ad.Password, + Topic: ad.Topic, + } +} + +// toKubeDevice serialize the EdgeX AdminState to the corresponding Kubernetes AdminState +func toKubeAdminState(ea models.AdminState) devicev1alpha1.AdminState { + if ea == models.Locked { + return devicev1alpha1.Locked + } + return devicev1alpha1.UnLocked +} + +// toKubeDevice serialize the EdgeX OperatingState to the corresponding +// Kubernetes OperatingState +func toKubeOperatingState(ea models.OperatingState) devicev1alpha1.OperatingState { + if ea == models.Enabled { + return devicev1alpha1.Enabled + } + return devicev1alpha1.Disabled +} diff --git a/clients/errors.go b/clients/errors.go index 2c172aa..c14e61d 100644 --- a/clients/errors.go +++ b/clients/errors.go @@ -16,7 +16,6 @@ limitations under the License. package clients -// import "errors" import "strings" type NotFoundError struct{} @@ -24,6 +23,8 @@ type NotFoundError struct{} func (e *NotFoundError) Error() string { return "Item not found" } func IsNotFoundErr(err error) bool { - return err.Error() == "Item not found" || strings.HasPrefix(err.Error(), "no item found") - // return errors.Is(err, &NotFoundError{}) + if err == nil { + return false + } + return strings.Contains(err.Error(), "not found") || strings.HasPrefix(err.Error(), "no item found") } diff --git a/config/crd/bases/device.openyurt.io_deviceservices.yaml b/config/crd/bases/device.openyurt.io_deviceservices.yaml index bc39d36..4534586 100644 --- a/config/crd/bases/device.openyurt.io_deviceservices.yaml +++ b/config/crd/bases/device.openyurt.io_deviceservices.yaml @@ -16,104 +16,162 @@ spec: singular: deviceservice scope: Namespaced versions: - - name: v1alpha1 - schema: - openAPIV3Schema: - description: DeviceService is the Schema for the deviceservices API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation + - name: v1alpha1 + schema: + openAPIV3Schema: + description: DeviceService is the Schema for the deviceservices API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: DeviceServiceSpec defines the desired state of DeviceService - properties: - addressable: - description: address (MQTT topic, HTTP address, serial bus, etc.) - for reaching the service - properties: - address: - description: Address of the addressable - type: string - id: - description: ID is a unique identifier for the Addressable, such - as a UUID - type: string - method: - description: Method for connecting (i.e. POST) - type: string - name: - description: Name is a unique name given to the Addressable - type: string - password: - description: Password of the user for authentication for the addressable - type: string - path: - description: Path for callbacks - type: string - port: - description: Port for the address - type: integer - protocol: - description: Protocol for the address (HTTP/TCP) - type: string - publisher: - description: For message bus protocols - type: string - topic: - description: Topic for message bus addressables - type: string - user: - description: User id for authentication + type: string + metadata: + type: object + spec: + description: DeviceServiceSpec defines the desired state of DeviceService + properties: + addressable: + description: address (MQTT topic, HTTP address, serial bus, etc.) + for reaching the service + properties: + address: + description: Address of the addressable + type: string + id: + description: ID is a unique identifier for the Addressable, such + as a UUID + type: string + method: + description: Method for connecting (i.e. POST) + type: string + name: + description: Name is a unique name given to the Addressable + type: string + password: + description: Password of the user for authentication for the addressable + type: string + path: + description: Path for callbacks + type: string + port: + description: Port for the address + type: integer + protocol: + description: Protocol for the address (HTTP/TCP) + type: string + publisher: + description: For message bus protocols + type: string + topic: + description: Topic for message bus addressables + type: string + user: + description: User id for authentication + type: string + type: object + adminState: + description: Device Service Admin State + type: string + description: + description: Information describing the device + type: string + labels: + description: tags or other labels applied to the device service for + search or other identification needs on the EdgeX Foundry + items: type: string - type: object - adminState: - description: Device Service Admin State - type: string - description: - type: string - id: - description: the Id assigned by the EdgeX foundry TODO store this - field in the status - type: string - labels: - description: tags or other labels applied to the device service for - search or other identification needs on the EdgeX Foundry - items: + type: array + managed: + description: True means deviceService is managed by cloud, cloud can + update the related fields False means cloud can't update the fields + type: boolean + nodePool: + description: NodePool indicates which nodePool the deviceService comes + from + type: string + operatingState: + description: operational state - either enabled or disabled + type: string + type: object + status: + description: DeviceServiceStatus defines the observed state of DeviceService + properties: + adminState: + description: Device Service Admin State + type: string + conditions: + description: current deviceService state + items: + description: Condition defines an observation of a Cluster API resource + operational state. + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status + to another. This should be when the underlying condition changed. + If that is not known, then using the time when the API field + changed is acceptable. + format: date-time + type: string + message: + description: A human readable message indicating details about + the transition. This field may be empty. + type: string + reason: + description: The reason for the condition's last transition + in CamelCase. The specific API may choose whether or not this + field is considered a guaranteed API. This field may not be + empty. + type: string + severity: + description: Severity provides an explicit classification of + Reason code, so the users or machines can immediately understand + the current situation and act accordingly. The Severity field + MUST be set only when Status=False. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition in CamelCase or in foo.example.com/CamelCase. + Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. + type: string + required: + - status + - type + type: object + type: array + edgeId: + description: the Id assigned by the edge platform type: string - type: array - lastConnected: - description: TODO store this field in the status - format: int64 - type: integer - lastReported: - description: time in milliseconds that the device last reported data - to the core TODO store this field in the status - format: int64 - type: integer - operatingState: - description: operational state - either enabled or disabled - type: string - type: object - status: - description: DeviceServiceStatus defines the observed state of DeviceService - properties: - addedToEdgeX: - type: boolean - type: object - type: object - served: true - storage: true - subresources: - status: {} + lastConnected: + description: time in milliseconds that the device last reported data + to the core + format: int64 + type: integer + lastReported: + description: time in milliseconds that the device last reported data + to the core + format: int64 + type: integer + synced: + description: Synced indicates whether the device already exists on + both OpenYurt and edge platform + type: boolean + type: object + type: object + served: true + storage: true + subresources: + status: {} status: acceptedNames: kind: "" diff --git a/controllers/deviceservice_controller.go b/controllers/deviceservice_controller.go index 60e05fe..5b756ea 100644 --- a/controllers/deviceservice_controller.go +++ b/controllers/deviceservice_controller.go @@ -18,25 +18,32 @@ package controllers import ( "context" + "encoding/json" "fmt" - "github.com/edgexfoundry/go-mod-core-contracts/models" "github.com/go-logr/logr" + devicev1alpha1 "github.com/openyurtio/device-controller/api/v1alpha1" + clis "github.com/openyurtio/device-controller/clients" + edgeInterface "github.com/openyurtio/device-controller/clients" + edgexCli "github.com/openyurtio/device-controller/clients/edgex-foundry" + "github.com/openyurtio/device-controller/controllers/util" + + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4" + "sigs.k8s.io/cluster-api/util/conditions" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - - devicev1alpha1 "github.com/openyurtio/device-controller/api/v1alpha1" - clis "github.com/openyurtio/device-controller/clients" - coremetacli "github.com/openyurtio/device-controller/clients/core-metadata" ) // DeviceServiceReconciler reconciles a DeviceService object type DeviceServiceReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - *coremetacli.CoreMetaClient + Log logr.Logger + Scheme *runtime.Scheme + deviceServiceCli edgeInterface.DeviceServiceInterface + NodePool string } //+kubebuilder:rbac:groups=device.openyurt.io,resources=deviceservices,verbs=get;list;watch;create;update;patch;delete @@ -44,89 +51,205 @@ type DeviceServiceReconciler struct { //+kubebuilder:rbac:groups=device.openyurt.io,resources=deviceservices/finalizers,verbs=update func (r *DeviceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("deviceservice", req.NamespacedName) + log := r.Log.WithValues("deviceService", req.NamespacedName) var ds devicev1alpha1.DeviceService if err := r.Get(ctx, req.NamespacedName, &ds); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, client.IgnoreNotFound(err) } - _, err := r.GetDeviceServiceByName(ds.GetName()) - if err == nil { - log.Info( - "DeviceService already exists on EdgeX") + // If objects doesn't belong to the edge platform to which the controller is connected, the controller does not handle events for that object + if ds.Spec.NodePool != r.NodePool { return ctrl.Result{}, nil } - if !clis.IsNotFoundErr(err) { - log.Error(err, "fail to visit the EdgeX core-metatdata-service") + log.V(4).Info("Reconciling the DeviceService object", "DeviceService", ds.GetName()) + // Update deviceService conditions + defer func() { + conditions.SetSummary(&ds, + conditions.WithConditions( + devicev1alpha1.DeviceServiceSyncedCondition, devicev1alpha1.DeviceServiceManagingCondition), + ) + err := r.Status().Update(ctx, &ds) + if client.IgnoreNotFound(err) != nil { + log.Error(err, "update deviceService conditions failed", "deviceService") + } + }() + + // 1. Handle the deviceService deletion event + if err := r.reconcileDeleteDeviceService(ctx, &ds); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } else if !ds.ObjectMeta.DeletionTimestamp.IsZero() { return ctrl.Result{}, nil } - // 1. create the addressable - add := toEdgeXAddressable(ds.Spec.Addressable) - _, err = r.GetAddressableByName(add.Name) - if err == nil { - log.Info( - "Addressable already exists on EdgeX") - return ctrl.Result{}, nil + if ds.Status.Synced == false { + // 2. Synchronize OpenYurt deviceService to edge platform + if err := r.reconcileCreateDeviceService(ctx, &ds, log); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + return ctrl.Result{}, err + } + } + } else if ds.Spec.Managed == true { + // 3. If the deviceService has been synchronized and is managed by the cloud, reconcile the deviceService fields + if err := r.reconcileUpdateDeviceService(ctx, &ds, log); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + return ctrl.Result{}, err + } + } } - addrEdgeXId, err := r.AddAddressable(add) + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *DeviceServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { + coreMetaCliInfo := edgexCli.ClientURL{Host: "edgex-core-metadata", Port: 48081} + r.deviceServiceCli = edgexCli.NewEdgexDeviceServiceClient(coreMetaCliInfo, r.Log) + + nodePool, err := util.GetNodePool(mgr.GetConfig()) if err != nil { - return ctrl.Result{}, fmt.Errorf("Fail to add addressable to EdgeX: %v", err) + return err + } + r.NodePool = nodePool + + // register the filter field for deviceService + if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &devicev1alpha1.DeviceService{}, "spec.nodePool", func(rawObj client.Object) []string { + deviceService := rawObj.(*devicev1alpha1.DeviceService) + return []string{deviceService.Spec.NodePool} + }); err != nil { + return err } - log.V(4).Info("Successfully add the Addressable to EdgeX", - "Addressable", add.Name, "EdgeXId", addrEdgeXId) - ds.Spec.Addressable.Id = addrEdgeXId + return ctrl.NewControllerManagedBy(mgr). + For(&devicev1alpha1.DeviceService{}). + Complete(r) +} - // 2. create the DeviceService - dsEdgeXId, err := r.AddDeviceService(toEdgexDeviceService(ds)) +func (r *DeviceServiceReconciler) reconcileDeleteDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService) error { + // gets the actual name of deviceService on the edge platform from the Label of the device + edgeDeviceServiceName := ds.ObjectMeta.Labels[EdgeXObjectName] + if ds.ObjectMeta.DeletionTimestamp.IsZero() { + if len(ds.GetFinalizers()) == 0 { + patchString := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": []string{devicev1alpha1.DeviceServiceFinalizer}, + }, + } + if patchData, err := json.Marshal(patchString); err != nil { + return err + } else { + if err = r.Patch(ctx, ds, client.RawPatch(types.MergePatchType, patchData)); err != nil { + return err + } + } + } + } else { + patchString := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": []string{}, + }, + } + // delete the deviceService in OpenYurt + if patchData, err := json.Marshal(patchString); err != nil { + return err + } else { + if err = r.Patch(ctx, ds, client.RawPatch(types.MergePatchType, patchData)); err != nil { + return err + } + } + + // delete the deviceService object on edge platform + err := r.deviceServiceCli.Delete(nil, edgeDeviceServiceName, edgeInterface.DeleteOptions{}) + if err != nil && !clis.IsNotFoundErr(err) { + return err + } + } + return nil +} + +func (r *DeviceServiceReconciler) reconcileCreateDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService, log logr.Logger) error { + // get the actual name of deviceService on the Edge platform from the Label of the device + edgeDeviceName := ds.ObjectMeta.Labels[EdgeXObjectName] + log.V(4).Info("Checking if deviceService already exist on the edge platform", "deviceService", ds.GetName()) + // Checking if deviceService already exist on the edge platform + if edgeDs, err := r.deviceServiceCli.Get(nil, edgeDeviceName, edgeInterface.GetOptions{}); err != nil { + if !clis.IsNotFoundErr(err) { + log.V(4).Error(err, "fail to visit the edge platform") + return nil + } + } else { + // a. If object exists, the status of the device on OpenYurt is updated + log.V(4).Info("DeviceService already exists on edge platform") + ds.Status.Synced = true + ds.Status.EdgeId = edgeDs.Status.EdgeId + return r.Status().Update(ctx, ds) + } + + // b. If object does not exist, a request is sent to the edge platform to create a new deviceService and related addressable + addressable := ds.Spec.Addressable + as, err := r.deviceServiceCli.GetAddressable(nil, addressable.Name, edgeInterface.GetOptions{}) + if err == nil { + log.V(4).Info("Addressable already exists on edge platform") + ds.Spec.Addressable = *as + } else if clis.IsNotFoundErr(err) { + createdAddr, err := r.deviceServiceCli.CreateAddressable(nil, &addressable, edgeInterface.CreateOptions{}) + if err != nil { + conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to add addressable to EdgeX", clusterv1.ConditionSeverityWarning, err.Error()) + return fmt.Errorf("failed to add addressable to edge platform: %v", err) + } + log.V(4).Info("Successfully add the Addressable to edge platform", + "Addressable", addressable.Name, "EdgeId", createdAddr.Id) + ds.Spec.Addressable.Id = createdAddr.Id + } else { + log.V(4).Error(err, "fail to visit the edge platform core-metatdata-service") + conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to visit the EdgeX core-metadata-service", clusterv1.ConditionSeverityWarning, err.Error()) + return err + } + if err = r.Update(ctx, ds); err != nil { + return err + } + + createdDs, err := r.deviceServiceCli.Create(nil, ds, edgeInterface.CreateOptions{}) if err != nil { - return ctrl.Result{}, fmt.Errorf("Fail to add DeviceService to EdgeX: %v", err) + log.V(4).Error(err, "failed to create deviceService on edge platform") + conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to add DeviceService to EdgeX", clusterv1.ConditionSeverityWarning, err.Error()) + return fmt.Errorf("fail to add DeviceService to edge platform: %v", err) } - log.V(4).Info("Successfully add DeviceService to EdgeX", - "DeviceService", ds.GetName(), "EdgeXId", dsEdgeXId) - ds.Spec.Id = dsEdgeXId - ds.Status.AddedToEdgeX = true - return ctrl.Result{}, r.Update(ctx, &ds) + log.V(4).Info("Successfully add DeviceService to Edge Platform", + "DeviceService", ds.GetName(), "EdgeId", createdDs.Status.EdgeId) + ds.Status.EdgeId = createdDs.Status.EdgeId + ds.Status.Synced = true + conditions.MarkTrue(ds, devicev1alpha1.DeviceServiceSyncedCondition) + return r.Status().Update(ctx, ds) } -func toEdgexDeviceService(ds devicev1alpha1.DeviceService) models.DeviceService { - return models.DeviceService{ - DescribedObject: models.DescribedObject{ - Description: ds.Spec.Description, - }, - Name: ds.GetName(), - Id: ds.Spec.Id, - LastConnected: ds.Spec.LastConnected, - LastReported: ds.Spec.LastReported, - OperatingState: models.OperatingState(ds.Spec.OperatingState), - Labels: ds.Spec.Labels, - AdminState: models.AdminState(ds.Spec.AdminState), - Addressable: toEdgeXAddressable(ds.Spec.Addressable), +func (r *DeviceServiceReconciler) reconcileUpdateDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService, log logr.Logger) error { + // 1. reconciling the AdminState field of deviceService + newDeviceServiceStatus := ds.Status.DeepCopy() + updateDeviceService := ds.DeepCopy() + // do not update deviceService's OperatingState + updateDeviceService.Spec.OperatingState = "" + + if ds.Spec.AdminState != "" && ds.Spec.AdminState != ds.Status.AdminState { + newDeviceServiceStatus.AdminState = ds.Spec.AdminState + } else { + updateDeviceService.Spec.AdminState = "" } -} -func toEdgeXAddressable(a devicev1alpha1.Addressable) models.Addressable { - return models.Addressable{ - Id: a.Id, - Name: a.Name, - Protocol: a.Protocol, - HTTPMethod: a.HTTPMethod, - Address: a.Address, - Port: a.Port, - Path: a.Path, - Publisher: a.Publisher, - User: a.User, - Password: a.Password, - Topic: a.Topic, + _, err := r.deviceServiceCli.Update(nil, updateDeviceService, edgeInterface.UpdateOptions{}) + if err != nil { + conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceManagingCondition, "failed to update AdminState of deviceService on edge platform", clusterv1.ConditionSeverityWarning, err.Error()) + return err } -} -// SetupWithManager sets up the controller with the Manager. -func (r *DeviceServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.CoreMetaClient = coremetacli.NewCoreMetaClient( - "edgex-core-metadata.default", 48081, r.Log) - return ctrl.NewControllerManagedBy(mgr). - For(&devicev1alpha1.DeviceService{}). - Complete(r) + // 2. update the device status on OpenYurt + ds.Status = *newDeviceServiceStatus + if err = r.Status().Update(ctx, ds); err != nil { + conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceManagingCondition, "failed to update status of deviceService on openyurt", clusterv1.ConditionSeverityWarning, err.Error()) + return err + } + conditions.MarkTrue(ds, devicev1alpha1.DeviceServiceManagingCondition) + return nil } diff --git a/controllers/deviceservice_syncer.go b/controllers/deviceservice_syncer.go index db7a838..988af08 100644 --- a/controllers/deviceservice_syncer.go +++ b/controllers/deviceservice_syncer.go @@ -18,157 +18,235 @@ package controllers import ( "context" + "github.com/openyurtio/device-controller/controllers/util" + "k8s.io/client-go/rest" "strings" "time" - "github.com/edgexfoundry/go-mod-core-contracts/models" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/go-logr/logr" - devv1 "github.com/openyurtio/device-controller/api/v1alpha1" - coremetacli "github.com/openyurtio/device-controller/clients/core-metadata" + devicev1alpha1 "github.com/openyurtio/device-controller/api/v1alpha1" + iotcli "github.com/openyurtio/device-controller/clients" + edgexCli "github.com/openyurtio/device-controller/clients/edgex-foundry" + + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" + ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ) type DeviceServiceSyncer struct { - // syncing period in seconds - syncPeriod time.Duration - // EdgeX core-data-service's client - *coremetacli.CoreMetaClient // Kubernetes client client.Client - log logr.Logger + // syncing period in seconds + syncPeriod time.Duration + deviceServiceCli iotcli.DeviceServiceInterface + log logr.Logger + NodePool string } func NewDeviceServiceSyncer(client client.Client, logr logr.Logger, - periodSecs uint32) DeviceServiceSyncer { + periodSecs uint32, cfg *rest.Config) (DeviceServiceSyncer, error) { log := logr.WithName("syncer").WithName("DeviceService") - return DeviceServiceSyncer{ - syncPeriod: time.Duration(periodSecs) * time.Second, - CoreMetaClient: coremetacli.NewCoreMetaClient( - "edgex-core-metadata.default", 48081, log), - Client: client, - log: log, + coreMetaCliInfo := edgexCli.ClientURL{Host: "edgex-core-metadata", Port: 48081} + + nodePool, err := util.GetNodePool(cfg) + if err != nil { + return DeviceServiceSyncer{}, err } + return DeviceServiceSyncer{ + syncPeriod: time.Duration(periodSecs) * time.Second, + deviceServiceCli: edgexCli.NewEdgexDeviceServiceClient(coreMetaCliInfo, logr), + Client: client, + log: log, + NodePool: nodePool, + }, nil } -func (dss *DeviceServiceSyncer) NewDeviceServiceSyncerRunnable() ctrlmgr.RunnableFunc { +func (ds *DeviceServiceSyncer) NewDeviceServiceSyncerRunnable() ctrlmgr.RunnableFunc { return func(ctx context.Context) error { - dss.Run(ctx.Done()) + ds.Run(ctx.Done()) return nil } } func (ds *DeviceServiceSyncer) Run(stop <-chan struct{}) { - ds.log.Info("starting the DeviceServiceSyncer...") + ds.log.V(1).Info("starting the DeviceServiceSyncer...") go func() { for { <-time.After(ds.syncPeriod) - // list deviceservice on edgex foundry - eDevs, err := ds.ListDeviceServices() + // 1. get deviceServices on edge platform and OpenYurt + edgeDeviceServices, kubeDeviceServices, err := ds.getAllDeviceServices() if err != nil { - ds.log.Error(err, "fail to list the deviceservice object on the EdgeX Foundry") + ds.log.V(3).Error(err, "fail to list the deviceServices") continue } - // list deviceservice on Kubernetes - var kDevs devv1.DeviceServiceList - if err := ds.List(context.TODO(), &kDevs); err != nil { - ds.log.Error(err, "fail to list the deviceservice object on the Kubernetes") + + // 2. find the deviceServices that need to be synchronized + redundantEdgeDeviceServices, redundantKubeDeviceServices, syncedDeviceServices := + ds.findDiffDeviceServices(edgeDeviceServices, kubeDeviceServices) + ds.log.V(1).Info("The number of deviceServices waiting for synchronization", + "Edge deviceServices should be added to OpenYurt", len(redundantEdgeDeviceServices), + "OpenYurt deviceServices that should be deleted", len(redundantKubeDeviceServices), + "DeviceServices that should be synchronized", len(syncedDeviceServices)) + + // 3. create deviceServices on OpenYurt which are exists in edge platform but not in OpenYurt + if err := ds.syncEdgeToKube(redundantEdgeDeviceServices); err != nil { + ds.log.V(3).Error(err, "fail to create deviceServices on OpenYurt") continue } - // create the deviceservice on Kubernetes but not on EdgeX - newKDevs := findNewDeviceService(eDevs, kDevs.Items) - if len(newKDevs) != 0 { - if err := createDeviceService(ds.log, ds.Client, newKDevs); err != nil { - ds.log.Error(err, "fail to create deviceservice") - continue - } + + // 4. delete redundant deviceServices on OpenYurt + if err := ds.deleteDeviceServices(redundantKubeDeviceServices); err != nil { + ds.log.V(3).Error(err, "fail to delete redundant deviceServices on OpenYurt") + continue + } + + // 5. update deviceService status on OpenYurt + if err := ds.updateDeviceServices(syncedDeviceServices); err != nil { + ds.log.Error(err, "fail to update deviceServices") + continue } - ds.log.V(5).Info("new deviceservice not found") + + ds.log.V(1).Info("One round of DeviceService synchronization is complete") } }() <-stop - ds.log.Info("stopping the device syncer") + ds.log.V(1).Info("stopping the deviceService syncer") } -// findNewDeviceService finds devices that have been created on the EdgeX but -// not the Kubernetes -func findNewDeviceService( - edgeXDevs []models.DeviceService, - kubeDevs []devv1.DeviceService) []models.DeviceService { - var retDevs []models.DeviceService - for _, exd := range edgeXDevs { - var exist bool - for _, kd := range kubeDevs { - if strings.ToLower(exd.Name) == kd.Name { - exist = true - break - } - } - if !exist { - retDevs = append(retDevs, exd) +// Get the existing DeviceService on the Edge platform, as well as OpenYurt existing DeviceService +// edgeDeviceServices:map[actualName]DeviceService +// kubeDeviceServices:map[actualName]DeviceService +func (ds *DeviceServiceSyncer) getAllDeviceServices() ( + map[string]devicev1alpha1.DeviceService, map[string]devicev1alpha1.DeviceService, error) { + + edgeDeviceServices := map[string]devicev1alpha1.DeviceService{} + kubeDeviceServices := map[string]devicev1alpha1.DeviceService{} + + // 1. list deviceServices on edge platform + eDevSs, err := ds.deviceServiceCli.List(nil, iotcli.ListOptions{}) + if err != nil { + ds.log.V(4).Error(err, "fail to list the deviceServices object on the edge platform") + return edgeDeviceServices, kubeDeviceServices, err + } + // 2. list deviceServices on OpenYurt (filter objects belonging to edgeServer) + var kDevSs devicev1alpha1.DeviceServiceList + listOptions := client.MatchingFields{"spec.nodePool": ds.NodePool} + if err = ds.List(context.TODO(), &kDevSs, listOptions); err != nil { + ds.log.V(4).Error(err, "fail to list the deviceServices object on the Kubernetes") + return edgeDeviceServices, kubeDeviceServices, err + } + for i := range eDevSs { + deviceServicesName := eDevSs[i].Labels[EdgeXObjectName] + edgeDeviceServices[deviceServicesName] = eDevSs[i] + } + + for i := range kDevSs.Items { + deviceServicesName := kDevSs.Items[i].Labels[EdgeXObjectName] + kubeDeviceServices[deviceServicesName] = kDevSs.Items[i] + } + return edgeDeviceServices, kubeDeviceServices, nil +} + +// Get the list of deviceServices that need to be added, deleted and updated +func (ds *DeviceServiceSyncer) findDiffDeviceServices( + edgeDeviceService map[string]devicev1alpha1.DeviceService, kubeDeviceService map[string]devicev1alpha1.DeviceService) ( + redundantEdgeDeviceServices map[string]*devicev1alpha1.DeviceService, redundantKubeDeviceServices map[string]*devicev1alpha1.DeviceService, syncedDeviceServices map[string]*devicev1alpha1.DeviceService) { + + redundantEdgeDeviceServices = map[string]*devicev1alpha1.DeviceService{} + redundantKubeDeviceServices = map[string]*devicev1alpha1.DeviceService{} + syncedDeviceServices = map[string]*devicev1alpha1.DeviceService{} + + for n, v := range edgeDeviceService { + edName := v.Labels[EdgeXObjectName] + if _, exists := kubeDeviceService[edName]; !exists { + ed := edgeDeviceService[n] + redundantEdgeDeviceServices[edName] = ds.completeCreateContent(&ed) + } else { + kd := kubeDeviceService[edName] + ed := edgeDeviceService[n] + syncedDeviceServices[edName] = ds.completeUpdateContent(&kd, &ed) } } - return retDevs + for k, v := range kubeDeviceService { + if !v.Status.Synced { + continue + } + kdName := v.Labels[EdgeXObjectName] + if _, exists := edgeDeviceService[kdName]; !exists { + kd := kubeDeviceService[k] + redundantKubeDeviceServices[kdName] = &kd + } + } + return } -// createDeviceService creates the list of devices -func createDeviceService(log logr.Logger, cli client.Client, edgeXDevs []models.DeviceService) error { - for _, ed := range edgeXDevs { - kd := toKubeDeviceService(ed) - if err := cli.Create(context.TODO(), &kd); err != nil { +// syncEdgeToKube creates deviceServices on OpenYurt which are exists in edge platform but not in OpenYurt +func (ds *DeviceServiceSyncer) syncEdgeToKube(edgeDevs map[string]*devicev1alpha1.DeviceService) error { + for _, ed := range edgeDevs { + if err := ds.Client.Create(context.TODO(), ed); err != nil { if apierrors.IsAlreadyExists(err) { - log.Info("DeviceService already exist on Kubernetes", - "deviceservice", strings.ToLower(ed.Name)) + ds.log.V(5).Info("DeviceService already exist on Kubernetes", + "DeviceService", strings.ToLower(ed.Name)) continue } - log.Error(err, "fail to create the DeviceService on Kubernetes", - "deviceservice", ed.Name) + ds.log.Info("created deviceService failed:", + "DeviceService", strings.ToLower(ed.Name)) return err } } return nil } -func toKubeDeviceService(ds models.DeviceService) devv1.DeviceService { - return devv1.DeviceService{ - ObjectMeta: metav1.ObjectMeta{ - Name: strings.ToLower(ds.Name), - Namespace: "default", - Labels: map[string]string{ - EdgeXObjectName: ds.Name, - }, - }, - Spec: devv1.DeviceServiceSpec{ - Description: ds.Description, - Id: ds.Id, - LastConnected: ds.LastConnected, - LastReported: ds.LastReported, - OperatingState: toKubeOperatingState(ds.OperatingState), - Labels: ds.Labels, - Addressable: toKubeAddressable(ds.Addressable), - AdminState: toKubeAdminState(ds.AdminState), - }, +// deleteDeviceServices deletes redundant deviceServices on OpenYurt +func (ds *DeviceServiceSyncer) deleteDeviceServices(redundantKubeDeviceServices map[string]*devicev1alpha1.DeviceService) error { + for i := range redundantKubeDeviceServices { + if err := ds.Client.Delete(context.TODO(), redundantKubeDeviceServices[i]); err != nil { + ds.log.V(5).Error(err, "fail to delete the DeviceService on Kubernetes", + "DeviceService", redundantKubeDeviceServices[i].Name) + return err + } } + return nil } -func toKubeAddressable(ad models.Addressable) devv1.Addressable { - return devv1.Addressable{ - Id: ad.Id, - Name: ad.Name, - Protocol: ad.Protocol, - HTTPMethod: ad.HTTPMethod, - Address: ad.Address, - Port: ad.Port, - Path: ad.Path, - Publisher: ad.Publisher, - User: ad.User, - Password: ad.Password, - Topic: ad.Topic, +// updateDeviceServicesStatus updates deviceServices status on OpenYurt +func (ds *DeviceServiceSyncer) updateDeviceServices(syncedDeviceServices map[string]*devicev1alpha1.DeviceService) error { + for _, sd := range syncedDeviceServices { + if sd.ObjectMeta.ResourceVersion == "" { + continue + } + if err := ds.Client.Status().Update(context.TODO(), sd); err != nil { + if apierrors.IsConflict(err) { + ds.log.V(5).Info("update Conflicts", + "DeviceService", sd.Name) + continue + } + ds.log.V(5).Error(err, "fail to update the DeviceService on Kubernetes", + "DeviceService", sd.Name) + return err + } } + return nil +} + +// completeCreateContent completes the content of the deviceService which will be created on OpenYurt +func (ds *DeviceServiceSyncer) completeCreateContent(edgeDS *devicev1alpha1.DeviceService) *devicev1alpha1.DeviceService { + createDevice := edgeDS.DeepCopy() + createDevice.Spec.NodePool = ds.NodePool + createDevice.Name = strings.Join([]string{ds.NodePool, createDevice.Name}, "-") + createDevice.Spec.Managed = false + return createDevice +} + +// completeUpdateContent completes the content of the deviceService which will be updated on OpenYurt +func (ds *DeviceServiceSyncer) completeUpdateContent(kubeDS *devicev1alpha1.DeviceService, edgeDS *devicev1alpha1.DeviceService) *devicev1alpha1.DeviceService { + updatedDS := kubeDS.DeepCopy() + // update device status + updatedDS.Status.LastConnected = edgeDS.Status.LastConnected + updatedDS.Status.LastReported = edgeDS.Status.LastReported + updatedDS.Status.AdminState = edgeDS.Status.AdminState + return updatedDS } diff --git a/go.mod b/go.mod index d2da965..8833e93 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,12 @@ go 1.15 require ( github.com/edgexfoundry/go-mod-core-contracts v0.1.111 - github.com/go-logr/logr v0.3.0 + github.com/go-logr/logr v0.4.0 github.com/go-resty/resty/v2 v2.4.0 - github.com/onsi/ginkgo v1.14.1 - github.com/onsi/gomega v1.10.2 - k8s.io/api v0.19.2 - k8s.io/apimachinery v0.19.2 - k8s.io/client-go v0.19.2 - sigs.k8s.io/controller-runtime v0.7.0 + github.com/onsi/ginkgo v1.16.4 + github.com/onsi/gomega v1.14.0 + k8s.io/apimachinery v0.21.3 + k8s.io/client-go v0.21.3 + sigs.k8s.io/cluster-api v0.4.2 + sigs.k8s.io/controller-runtime v0.9.6 ) diff --git a/main.go b/main.go index 823939e..f9d3523 100644 --- a/main.go +++ b/main.go @@ -124,7 +124,11 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DeviceService") os.Exit(1) } - dss := controllers.NewDeviceServiceSyncer(mgr.GetClient(), mgr.GetLogger(), EdgeXSyncPeriodSecs) + dss, err := controllers.NewDeviceServiceSyncer(mgr.GetClient(), mgr.GetLogger(), EdgeXSyncPeriodSecs, mgr.GetConfig()) + if err != nil { + setupLog.Error(err, "unable to create syncer", "syncer", "DeviceService") + os.Exit(1) + } mgr.Add(dss.NewDeviceServiceSyncerRunnable()) setupLog.Info("add device service syncer") //+kubebuilder:scaffold:builder