Skip to content

Commit

Permalink
feature: bcs-kube-agent register to bcs-api with websocket, to implem…
Browse files Browse the repository at this point in the history
…ent service register and discovery accross clouds. issue TencentBlueKing#414
  • Loading branch information
bryanhe-bupt committed Mar 17, 2020
1 parent 3a0e460 commit b07166a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 2 deletions.
12 changes: 10 additions & 2 deletions bcs-k8s/bcs-kube-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,18 @@ func Run() error {

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("Error building kubernetes clientset: %s", err.Error())
return fmt.Errorf("error building kubernetes clientset: %s", err.Error())
}

go reportToBke(kubeClient, cfg)
useWebsocket := viper.GetBool("agent.use-websocket")
if useWebsocket {
err := buildWebsocketToBke(cfg)
if err != nil {
return err
}
} else {
go reportToBke(kubeClient, cfg)
}

// TODO: Add prometheus monitor metrics
http.Handle("/metrics", promhttp.Handler())
Expand Down
122 changes: 122 additions & 0 deletions bcs-k8s/bcs-kube-agent/app/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package app

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"net/url"
"os"
"time"

"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-common/common/websocketDialer"
"github.com/spf13/viper"
"k8s.io/client-go/rest"
)

const (
kubernetesServiceHost = "KUBERNETES_SERVICE_HOST"
kubernetesServicePort = "KUBERNETES_SERVICE_PORT"

Module = "BCS-API-Tunnel-Module"
RegisterToken = "BCS-API-Tunnel-Token"
Params = "BCS-API-Tunnel-Params"
Cluster = "BCS-API-Tunnel-ClusterId"

ModuleName = "kube-agent"
)

func getenv(env string) (string, error) {
value := os.Getenv(env)
if value == "" {
return "", fmt.Errorf("%s is empty", env)
}
return value, nil
}

func buildWebsocketToBke(cfg *rest.Config) error {
bkeServerAddress := viper.GetString("bke.serverAddress")
clusterId := viper.GetString("cluster.id")
registerToken := os.Getenv("REGISTER_TOKEN")

bkeServerUrl, err := url.Parse(bkeServerAddress)
if err != nil {
return err
}

if err := populateCAData(cfg); err != nil {
return fmt.Errorf("error populating ca data: %s", err.Error())
}

kubernetesServiceHost, err := getenv(kubernetesServiceHost)
if err != nil {
return err
}
kubernetesServicePort, err := getenv(kubernetesServicePort)
if err != nil {
return err
}
params := map[string]interface{}{
"address": fmt.Sprintf("https://%s:%s", kubernetesServiceHost, kubernetesServicePort),
"userToken": cfg.BearerToken,
"caCert": base64.StdEncoding.EncodeToString(cfg.CAData),
}
bytes, err := json.Marshal(params)
if err != nil {
return err
}

headers := map[string][]string{
Module: {ModuleName},
Cluster: {clusterId},
RegisterToken: {registerToken},
Params: {base64.StdEncoding.EncodeToString(bytes)},
}

var tlsConfig *tls.Config
insecureSkipVerify := viper.GetBool("agent.insecureSkipVerify")
if insecureSkipVerify {
tlsConfig = &tls.Config{InsecureSkipVerify: insecureSkipVerify}
} else {
pool := x509.NewCertPool()
caCrtStr := os.Getenv("SERVER_CERT")
caCrt := []byte(caCrtStr)
pool.AppendCertsFromPEM(caCrt)
tlsConfig = &tls.Config{RootCAs: pool}
}

go func() {
for {
wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bkeServerUrl.Host)
blog.Infof("Connecting to %s with token %s", wsURL, registerToken)

websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool {
switch proto {
case "tcp":
return true
case "unix":
return address == "/var/run/docker.sock"
}
return false
})
time.Sleep(5 * time.Second)
}
}()

return nil
}
4 changes: 4 additions & 0 deletions bcs-k8s/bcs-kube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
insecureSkipVerify bool
// 外网跨云部署时,需要上报的api-server的公网或代理地址
ExternalProxyAddresses string
// 是否使用 websocket 进行注册
useWebsocket bool
)

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -76,6 +78,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&clusterId, "cluster-id", "", "cluster which the agent run in")
rootCmd.PersistentFlags().BoolVar(&insecureSkipVerify, "insecureSkipVerify", false, "verifies the server's certificate chain and host name")
rootCmd.PersistentFlags().StringVar(&ExternalProxyAddresses, "external-proxy-addresses", "", "external proxy addresses of apiserver, separated by semicolon")
rootCmd.PersistentFlags().BoolVar(&useWebsocket, "use-websocket", false, "whether use websocket to register to bcs-api")
// these three flag support direct flag and viper config at the same time, the direct flag could cover the viper config.
viper.BindPFlag("agent.kubeconfig", rootCmd.PersistentFlags().Lookup("kubeconfig"))
viper.BindPFlag("agent.periodSync", rootCmd.PersistentFlags().Lookup("periodsync"))
Expand All @@ -84,6 +87,7 @@ func init() {
viper.BindPFlag("cluster.id", rootCmd.PersistentFlags().Lookup("cluster-id"))
viper.BindPFlag("agent.insecureSkipVerify", rootCmd.PersistentFlags().Lookup("insecureSkipVerify"))
viper.BindPFlag("agent.external-proxy-addresses", rootCmd.PersistentFlags().Lookup("external-proxy-addresses"))
viper.BindPFlag("agent.use-websocket", rootCmd.PersistentFlags().Lookup("use-websocket"))
}

func initConfig() {
Expand Down

0 comments on commit b07166a

Please sign in to comment.