Skip to content

Commit

Permalink
Merge pull request #939 from AlexAi27/dev_storage_client
Browse files Browse the repository at this point in the history
storage client etcd服务发现 && mongo-driver 1.5版本插入索引报错
  • Loading branch information
DeveloperJim authored Jul 1, 2021
2 parents 3e12717 + 80632c9 commit c2057bb
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 26 deletions.
4 changes: 2 additions & 2 deletions bcs-common/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ require (
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/ugorji/go/codec v1.2.3
go.mongodb.org/mongo-driver v1.4.5
go.mongodb.org/mongo-driver v1.5.3
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.27.0
gopkg.in/inf.v0 v0.9.1 // indirect
moul.io/http2curl v1.0.0 // indirect
)
)
3 changes: 3 additions & 0 deletions bcs-common/pkg/bcsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"

cm "github.com/Tencent/bk-bcs/bcs-common/pkg/bcsapi/clustermanager"
"github.com/Tencent/bk-bcs/bcs-common/pkg/registry"
)

//! v4 version binding~
Expand All @@ -39,6 +40,8 @@ type Config struct {
ClusterID string
// proxy flag for go through bcs-api-gateway
Gateway bool
// etcd registry config for bcs modules
Etcd registry.CMDOptions
}

// BasicResponse basic http response for bkbcs
Expand Down
142 changes: 140 additions & 2 deletions bcs-common/pkg/bcsapi/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ package bcsapi
import (
"encoding/json"
"fmt"
"strings"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-common/common/types"
"github.com/Tencent/bk-bcs/bcs-common/common/version"
"github.com/Tencent/bk-bcs/bcs-common/pkg/bcsapi/storage"
restclient "github.com/Tencent/bk-bcs/bcs-common/pkg/esb/client"
"github.com/Tencent/bk-bcs/bcs-common/pkg/odm/drivers"
"github.com/Tencent/bk-bcs/bcs-common/pkg/registry"
)

const (
customResourcePath = "bcsstorage/v1/dynamic/customresources/%s"
customResourceIndexPath = "bcsstorage/v1/dynamic/customresources/%s/index/%s"
)

// Storage interface definition for bcs-storage
Expand All @@ -31,6 +40,17 @@ type Storage interface {
QueryK8SPod(cluster string) ([]*storage.Pod, error)
// GetIPPoolDetailInfo get all underlay ip information
GetIPPoolDetailInfo(clusterID string) ([]*storage.IPPool, error)
// ListCustomResource list custom resources, Unmarshalled to dest.
// dest should be a pointer to a struct of map[string]interface{}
ListCustomResource(resourceType string, filter map[string]string, dest interface{}) error
// PutCustomResource put custom resources, support map or struct
PutCustomResource(resourceType string, data interface{}) error
// DeleteCustomResource delete custom resources, data is resource filter
DeleteCustomResource(resourceType string, data map[string]string) error
// CreateCustomResourceIndex create custom resources' index
CreateCustomResourceIndex(resourceType string, index drivers.Index) error
// DeleteCustomResourceIndex delete custom resources' index
DeleteCustomResourceIndex(resourceType string, indexName string) error
}

// NewStorage create bcs-storage api implementation
Expand All @@ -43,13 +63,21 @@ func NewStorage(config *Config) Storage {
} else {
c.Client = restclient.NewRESTClient()
}
if c.Config.Etcd.Feature {
err := c.watchEndpoints()
if err != nil {
blog.Errorf("watch etcd of service storage failed: %s", err.Error())
return nil
}
}
return c
}

// StorageCli bcsf-storage client implementation
type StorageCli struct {
Config *Config
Client *restclient.RESTClient
Config *Config
Client *restclient.RESTClient
discover registry.Registry
}

// getRequestPath get storage query URL prefix
Expand Down Expand Up @@ -150,3 +178,113 @@ func (c *StorageCli) GetIPPoolDetailInfo(clusterID string) ([]*storage.IPPool, e
}
return detailResponse[0].Datas, nil
}

// ListCustomResource list custom resources, dest should be corresponding resource type or map[string]interface{}
func (c *StorageCli) ListCustomResource(resourceType string, filter map[string]string, dest interface{}) error {
err := bkbcsSetting(c.Client.Get(), c.Config).
WithEndpoints(c.Config.Hosts).
WithBasePath("/").
SubPathf(customResourcePath, resourceType).
WithParams(filter).
Do().
Into(dest)
if err != nil {
return err
}
return nil
}

// PutCustomResource put cluster resource
func (c *StorageCli) PutCustomResource(resourceType string, data interface{}) error {
resp := bkbcsSetting(c.Client.Put(), c.Config).
WithEndpoints(c.Config.Hosts).
WithBasePath("/").
SubPathf(customResourcePath, resourceType).
WithJSON(data).
Do()
if resp.Err != nil {
return resp.Err
}
return nil
}

// DeleteCustomResource delete custom resource
func (c *StorageCli) DeleteCustomResource(resourceType string, data map[string]string) error {
resp := bkbcsSetting(c.Client.Delete(), c.Config).
WithEndpoints(c.Config.Hosts).
WithBasePath("/").
SubPathf(customResourcePath, resourceType).
WithParams(data).
Do()
if resp.Err != nil {
return resp.Err
}
return nil
}

// CreateCustomResourceIndex create custom resource index
func (c *StorageCli) CreateCustomResourceIndex(resourceType string, index drivers.Index) error {
resp := bkbcsSetting(c.Client.Put(), c.Config).
WithEndpoints(c.Config.Hosts).
WithBasePath("/").
SubPathf(customResourceIndexPath, resourceType, index.Name).
WithJSON(index.Key).
Do()
if resp.Err != nil {
return resp.Err
}
return nil
}

// DeleteCustomResourceIndex delete custom resource index
func (c *StorageCli) DeleteCustomResourceIndex(resourceType string, indexName string) error {
resp := bkbcsSetting(c.Client.Delete(), c.Config).
WithEndpoints(c.Config.Hosts).
WithBasePath("/").
SubPathf(customResourceIndexPath, resourceType, indexName).
Do()
if resp.Err != nil {
return resp.Err
}
return nil
}

func (c *StorageCli) watchEndpoints() error {
tlsconf, err := c.Config.Etcd.GetTLSConfig()
if err != nil {
blog.Errorf("Get tlsconfig for etcd failed: %s, ca: %s, cert: %s, key:%s",
err.Error(), c.Config.Etcd.CA, c.Config.Etcd.Cert, c.Config.Etcd.Key)
return err
}
options := &registry.Options{
RegistryAddr: strings.Split(c.Config.Etcd.Address, ","),
Name: types.BCS_MODULE_STORAGE + "bkbcs.tencent.com",
Version: version.BcsVersion,
Config: tlsconf,
EvtHandler: c.handlerEtcdEvent,
}
c.discover = registry.NewEtcdRegistry(options)
if c.discover == nil {
blog.Errorf("NewEtcdRegistry for service (%s) discovery failed", types.BCS_MODULE_STORAGE)
return fmt.Errorf("NewEtcdRegistry for service (%s) discovery failed", types.BCS_MODULE_STORAGE)
}
c.handlerEtcdEvent(options.Name)
return nil
}

func (c *StorageCli) handlerEtcdEvent(svcName string) {
svc, err := c.discover.Get(svcName)
if err != nil {
blog.Errorf("Get svc %s from etcd registry failed: %s", svcName, err.Error())
return
}
if len(svc.Nodes) == 0 {
blog.Warnf("Non service found from etcd named %s", svcName)
}
endpoints := make([]string, 0)
for _, node := range svc.Nodes {
endpoints = append(endpoints, node.Address)
}
c.Config.Hosts = endpoints
blog.V(3).Infof("%d endpoints found for service %s in etcd registry: %+v", len(endpoints), svcName, endpoints)
}
38 changes: 38 additions & 0 deletions bcs-common/pkg/bcsapi/storage/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package storage

import "time"

// CommonResponseHeader is common response for storage resource api
type CommonResponseHeader struct {
Result bool `json:"result"`
Code int `json:"code"`
Message string `json:"message"`
PageSize int64 `json:"pageSize"`
Offset int64 `json:"offset"`
Total int64 `json:"total"`
}

// CommonDataHeader is common header for storage dynamic data api
type CommonDataHeader struct {
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
UpdateTime time.Time `json:"updateTime"`
ID string `json:"_id"`
ResourceType string `json:"resourceType"`
ClusterID string `json:"clusterId"`
IsBcsObjectDeleted bool `json:"_isBcsObjectDeleted"`
CreateTime time.Time `json:"createTime"`
}
32 changes: 16 additions & 16 deletions bcs-common/pkg/bcsapi/storage/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@ import (

// Pod definition in mongodb
type Pod struct {
ID string `json:"_id"`
ResourceName string `json:"resourceName"`
ResourceType string `json:"resourceType"`
Namespace string `json:"namespace"`
ClusterID string `json:"clusterId"`
CreateTime string `json:"createTime"`
UpdateTime string `json:"updateTime"`
Data *core.Pod `json:"data"`
CommonDataHeader
Data *core.Pod `json:"data"`
}

// Taskgroup bcs-storage taskgroup data of mesos
type Taskgroup struct {
ID string `json:"_id"`
ResourceName string `json:"resourceName"`
ResourceType string `json:"resourceType"`
Namespace string `json:"namespace"`
ClusterID string `json:"clusterId"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
Data *mesostype.BcsPodStatus `json:"data"`
CommonDataHeader
Data *mesostype.BcsPodStatus `json:"data"`
}

// PodList is response for storage pod list operation
type PodList struct {
CommonResponseHeader
Data []Pod `json:"data"`
}

// TaskgroupList is response for storage taskgroup list operation
type TaskgroupList struct {
CommonResponseHeader
Data []Taskgroup `json:"data"`
}
9 changes: 5 additions & 4 deletions bcs-common/pkg/odm/drivers/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
"time"

"github.com/Tencent/bk-bcs/bcs-common/pkg/odm/operator"
"go.mongodb.org/mongo-driver/bson"
)

// Index index for database table
type Index struct {
Key map[string]int32 `json:"key" bson:"key"`
Name string `json:"name" bson:"name"`
Unique bool `json:"unique" bson:"unique"`
Background bool `json:"background" bson:"background"`
Key bson.D `json:"key" bson:"key"`
Name string `json:"name" bson:"name"`
Unique bool `json:"unique" bson:"unique"`
Background bool `json:"background" bson:"background"`
}

// DB interface for database
Expand Down
3 changes: 1 addition & 2 deletions bcs-common/pkg/odm/drivers/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package mongo
import (
"context"
"fmt"
"strings"
"time"

"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -309,7 +308,7 @@ func (c *Collection) Insert(ctx context.Context, docs []interface{}) (int, error
}()
ret, err = c.mCli.Database(c.dbName).Collection(c.collectionName).InsertMany(ctx, docs)
if err != nil {
if strings.Contains(err.Error(), "E11000 duplicate key") {
if mongo.IsDuplicateKeyError(err) {
return len(ret.InsertedIDs), drivers.ErrTableRecordDuplicateKey
}
return len(ret.InsertedIDs), err
Expand Down

0 comments on commit c2057bb

Please sign in to comment.