Skip to content

Commit

Permalink
feature: bcs-mesos-driver register to bcs-api with websocket, to impl…
Browse files Browse the repository at this point in the history
…ement service register and discovery accross clouds. issue TencentBlueKing#415
  • Loading branch information
bryanhe-bupt committed Mar 17, 2020
1 parent b07166a commit c95cbe5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 1 deletion.
10 changes: 10 additions & 0 deletions bcs-mesos/bcs-mesos-driver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type MesosDriverOptionsOut struct {
Cluster string `json:"cluster" value:"" usage:"the cluster ID under bcs"`
AdmissionWebhook bool `json:"admission_webhook" value:"false" usage:"whether admission webhook"`
KubeConfig string `json:"kubeconfig" value:"" usage:"kube config for custom resource feature and etcd storage"`

// websocket register
RegisterWithWebsocket bool `json:"register-with-websocket" value:"false" usage:"whether register to bcs-api with websocket"`
RegisterToken string `json:"register-token" value:"" usage:"register token to register to bcs-api"`
RegisterUrl string `json:"register-url" value:"" usage:"bcs-api url to register"`
InsecureSkipVerify bool `json:"insecure-skip-verify" value:"false" usage:"whether insecure skip verify"`
}

//MesosDriverOption is option in flags
Expand Down Expand Up @@ -70,6 +76,10 @@ func NewMesosDriverOption(opOut *MesosDriverOptionsOut) *MesosDriverOption {
CertPasswd: static.ClientCertPwd,
IsSSL: false,
},
RegisterWithWebsocket: opOut.RegisterWithWebsocket,
RegisterToken: opOut.RegisterToken,
RegisterUrl: opOut.RegisterUrl,
InsecureSkipVerify: opOut.InsecureSkipVerify,
},
}
}
6 changes: 6 additions & 0 deletions bcs-mesos/bcs-mesos-driver/mesosdriver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ type MesosDriverConfig struct {
AdmissionWebhook bool
//KubeConfig kubeconfig for CustomResource
KubeConfig string

// websocket register
RegisterWithWebsocket bool
RegisterToken string
RegisterUrl string
InsecureSkipVerify bool
}
12 changes: 11 additions & 1 deletion bcs-mesos/bcs-mesos-driver/mesosdriver/mesosdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,17 @@ func (m *MesosDriver) Start() error {
m.RunMetric()

go m.DiscvScheduler()
go m.RegDiscover()

if m.config.RegisterWithWebsocket {
err := m.buildWebsocketToApi()
if err != nil {
blog.Fatalf("err when register with websocket: %s", err.Error())
os.Exit(1)
}
} else {
go m.RegDiscover()
}

go m.registerMesosZkEndpoints()

chErr := make(chan error, 1)
Expand Down
112 changes: 112 additions & 0 deletions bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package mesosdriver

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/url"
"time"

"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-common/common/websocketDialer"
)

const (
Module = "BCS-API-Tunnel-Module"
RegisterToken = "BCS-API-Tunnel-Token"
Params = "BCS-API-Tunnel-Params"
Cluster = "BCS-API-Tunnel-ClusterId"
ModuleName = "mesos-driver"
)

func (m *MesosDriver) buildWebsocketToApi() error {
if m.config.RegisterUrl == "" {
return errors.New("register url is empty")
}
bcsApiUrl, err := url.Parse(m.config.RegisterUrl)
if err != nil {
return err
}

if m.config.RegisterToken == "" {
return errors.New("register token is empty")
}
if m.config.Cluster == "" {
return errors.New("clusterid is empty")
}

var serverAddress string
if m.config.ServCert.IsSSL {
serverAddress = fmt.Sprintf("https://%s:%d", m.config.Address, m.config.Port)
} else {
serverAddress = fmt.Sprintf("http://%s:%d", m.config.Address, m.config.Port)
}

params := map[string]interface{}{
"address": serverAddress,
}
bytes, err := json.Marshal(params)
if err != nil {
return err
}
headers := map[string][]string{
Module: {ModuleName},
Cluster: {m.config.Cluster},
RegisterToken: {m.config.RegisterToken},
Params: {base64.StdEncoding.EncodeToString(bytes)},
}

var tlsConfig *tls.Config
if m.config.InsecureSkipVerify {
tlsConfig = &tls.Config{InsecureSkipVerify: true}
} else {
// use bcs cacert
pool := x509.NewCertPool()
ca, err := ioutil.ReadFile(m.config.ClientCert.CAFile)
if err != nil {
return err
}
if ok := pool.AppendCertsFromPEM(ca); ok != true {
return fmt.Errorf("append ca cert failed")
}
tlsConfig = &tls.Config{RootCAs: pool}
}

go func() {
for {
wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bcsApiUrl.Host)
blog.Infof("Connecting to %s with token %s", wsURL, m.config.RegisterToken)

websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool {
switch proto {
case "tcp":
return true
case "unix":
return address == "/var/run/docker.sock"
}
return false
})
time.Sleep(5 * time.Second)
}
}()

return nil
}

0 comments on commit c95cbe5

Please sign in to comment.