Skip to content

Commit

Permalink
feature: 日志采集支持非webhook方案 TencentBlueKing#432
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Apr 26, 2020
1 parent 0f1a55c commit eb21565
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 74 deletions.
106 changes: 39 additions & 67 deletions bcs-services/bcs-logbeat-sidecar/sidecar/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
package sidecar

import (
"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-services/bcs-logbeat-sidecar/config"
"bk-bcs/bcs-services/bcs-logbeat-sidecar/types"
bkbcsv1 "bk-bcs/bcs-services/bcs-webhook-server/pkg/client/listers/bk-bcs/v1"
"fmt"
"io/ioutil"
"os"
"strconv"
"sync"
"time"

"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-services/bcs-logbeat-sidecar/config"
"bk-bcs/bcs-services/bcs-logbeat-sidecar/types"
bkbcsv1 "bk-bcs/bcs-services/bcs-webhook-server/pkg/client/listers/bk-bcs/v1"

"github.com/fsouza/go-dockerclient"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -61,7 +59,7 @@ type SidecarController struct {
type ContainerLogConf struct {
containerId string
confPath string
needCollect bool
//needCollect bool
}

type LogConfParameter struct {
Expand Down Expand Up @@ -101,15 +99,12 @@ func NewSidecarController(conf *config.Config) (*SidecarController, error) {
if err != nil {
return nil, err
}

//sync logconfig file
s.syncLogConfs()
return s, nil
}

func (s *SidecarController) Start() {
go s.listenerDockerEvent()
go s.tickerSyncContainerLogConfs()
//go s.tickerSyncContainerLogConfs()
}

//start listen docker api event
Expand Down Expand Up @@ -148,25 +143,16 @@ func (s *SidecarController) listenerDockerEvent() {

// stop container
case "destroy":
s.Lock()
s.deleteContainerLogConf(msg.ID)
}
}
}

func (s *SidecarController) tickerSyncContainerLogConfs() {
ticker := time.NewTicker(time.Minute * 10)

for {
select {
case <-ticker.C:
s.syncLogConfs()
s.Unlock()
}
}
}

func (s *SidecarController) syncLogConfs() {
//list all running containers
apiContainers, err := s.client.ListContainers(docker.ListContainersOptions{})
apiContainers, err := s.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
blog.Errorf("docker ListContainers failed: %s", err.Error())
return
Expand Down Expand Up @@ -289,50 +275,40 @@ func (s *SidecarController) getContainerLogConfKey(containerId string) string {

func (s *SidecarController) produceContainerLogConf(c *docker.Container) {
key := s.getContainerLogConfKey(c.ID)
s.RLock()
_, ok := s.logConfs[key]
s.RUnlock()
if ok {
blog.V(3).Infof("container %s already under SidecarController manager", c.ID)
y, ok := s.produceLogConfParameterV2(c)
//the container don't match any BcsLogConfig
if !ok {
s.Lock()
defer s.Unlock()
_, ok := s.logConfs[key]
//if the container have logconfig, then delete it
if ok {
s.deleteContainerLogConf(c.ID)
delete(s.logConfs,key)
}
blog.Infof("container %s don't need collect log", c.ID)
return
}

logConf := &ContainerLogConf{
containerId: c.ID,
confPath: key,
}
y, ok := s.produceLogConfParameterV2(c)
if !ok {
logConf.needCollect = false
s.Lock()
s.logConfs[key] = logConf
s.Unlock()
return
}

by, _ := yaml.Marshal(y)
blog.Infof("container %s need been collected log, and LogConfig(%s)", c.ID, string(by))
logConf.needCollect = true
_, err := os.Stat(logConf.confPath)
//if confpath not exist, then create it
f, err := os.Create(logConf.confPath)
if err != nil {
f, err := os.Create(logConf.confPath)
if err != nil {
blog.Errorf("container %s open file %s failed: %s", c.ID, logConf.confPath, err.Error())
return
}
defer f.Close()

_, err = f.Write(by)
if err != nil {
blog.Errorf("container %s tempalte execute failed: %s", c.ID, err.Error())
return
}
blog.Infof("produce container %s log config %s success", c.ID, logConf.confPath)
} else {
blog.Infof("container %s log config %s already exist, then don't need create it", c.ID, logConf.confPath)
blog.Errorf("container %s open file %s failed: %s", c.ID, logConf.confPath, err.Error())
return
}
defer f.Close()

_, err = f.Write(by)
if err != nil {
blog.Errorf("container %s tempalte execute failed: %s", c.ID, err.Error())
return
}
blog.Infof("produce container %s log config %s success", c.ID, logConf.confPath)
s.Lock()
s.logConfs[key] = logConf
s.Unlock()
Expand All @@ -341,25 +317,17 @@ func (s *SidecarController) produceContainerLogConf(c *docker.Container) {

func (s *SidecarController) deleteContainerLogConf(containerId string) {
key := s.getContainerLogConfKey(containerId)
s.RLock()
logConf, ok := s.logConfs[key]
s.RUnlock()
if !ok {
blog.Infof("container %s don't have LogConfig, then ignore", containerId)
return
}

if logConf.needCollect {
err := os.Remove(logConf.confPath)
if err != nil {
blog.Errorf("remove log config %s error %s", logConf.confPath, err.Error())
return
}
err := os.Remove(logConf.confPath)
if err != nil {
blog.Errorf("remove log config %s error %s", logConf.confPath, err.Error())
return
}

s.Lock()
delete(s.logConfs, key)
s.Unlock()
blog.Infof("delete container %s log config success", containerId)
}

Expand Down Expand Up @@ -389,6 +357,7 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
blog.Warnf("container %s don't match BcsLogConfig", container.ID)
return nil, false
}

para.ExtMeta["io_tencent_bcs_cluster"] = logConf.Spec.ClusterId
para.ExtMeta["io_tencent_bcs_namespace"] = pod.Namespace
para.ExtMeta["io_tencent_bcs_server_name"] = pod.OwnerReferences[0].Name
Expand All @@ -414,6 +383,9 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
para.NonstandardPaths = logConf.Spec.LogPaths
para.LogTags = logConf.Spec.LogTags
}
for k,v :=range para.LogTags {
para.ExtMeta[k] = v
}

y := &types.Yaml{Local: make([]types.Local, 0)}
//if stdout container log
Expand Down
38 changes: 31 additions & 7 deletions bcs-services/bcs-logbeat-sidecar/sidecar/logconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *SidecarController) initKubeconfig() error {
blog.Errorf("build kubeclient by kubeconfig %s error %s", s.conf.Kubeconfig, err.Error())
return err
}
factory := informers.NewSharedInformerFactory(kubeClient, time.Minute*10)
factory := informers.NewSharedInformerFactory(kubeClient, 0)
s.podLister = factory.Core().V1().Pods().Lister()
factory.Start(stopCh)
// Wait for all caches to sync.
Expand All @@ -75,7 +75,7 @@ func (s *SidecarController) initKubeconfig() error {
blog.Errorf("build internal clientset by kubeconfig %s error %s", s.conf.Kubeconfig, err.Error())
return err
}
internalFactory := externalversions.NewSharedInformerFactory(internalClientset, time.Minute*10)
internalFactory := externalversions.NewSharedInformerFactory(internalClientset, time.Hour)
s.bcsLogConfigInformer = internalFactory.Bkbcs().V1().BcsLogConfigs().Informer()
s.bcsLogConfigLister = internalFactory.Bkbcs().V1().BcsLogConfigs().Lister()
internalFactory.Start(stopCh)
Expand Down Expand Up @@ -134,21 +134,25 @@ func (s *SidecarController) getPodLogConfigCrd(container *docker.Container, pod
blog.Errorf("list bcslogconfig error %s", err.Error())
return nil
}
if len(bcsLogConfs)==0 {
blog.Warnf("The container clusters don't have any BcsLogConfig")
return nil
}

var highLogConfig *bcsv1.BcsLogConfig
var highScore int
for _, conf := range bcsLogConfs {
blog.Infof("BcsLogConfig(%s) check pod(%s) container(%s)", conf.Name, pod.Name, container.ID)
score := scoreBcsLogConfig(container, pod, conf)
if score > highScore {
blog.Infof("container %s pod(%s) BcsLogConfig(%s) higher score(%d)",
container.ID, pod.Name, highLogConfig.Name, score)
highScore = score
highLogConfig = conf
blog.Infof("container %s pod(%s) BcsLogConfig(%s) higher score(%d)",
container.ID, pod.Name, highLogConfig.Name, score)
}
}
if highLogConfig == nil {
blog.Warnf("container %s pod(%s) not match BcsLogConfig", container.ID, pod.Name)
blog.Warnf("container %s pod(%s) not match BcsLogConfigs", container.ID, pod.Name)
} else {
blog.Infof("container %s pod(%s) match BcsLogConfig(%s)", container.ID, pod.Name, highLogConfig.Name)
}
Expand Down Expand Up @@ -177,9 +181,17 @@ func scoreBcsLogConfig(container *docker.Container, pod *corev1.Pod, bcsLogConf
//BcsLogConfig parameter WorkloadType、WorkloadName、WorkloadNamespace matched, increased 2 score
//else not matched, return 0 score
if bcsLogConf.Spec.WorkloadType != "" {
//if pod don't belong any workload
if len(pod.OwnerReferences)==0 {
blog.Warnf("container %s pod(%s:%s) not match BcsLogConfig(%s:%s) WorkloadType",
container.ID, pod.Name, pod.Namespace, bcsLogConf.Name, bcsLogConf.Spec.WorkloadType)
return 0
}

matched := false
if pod.OwnerReferences[0].Kind == "ReplicaSet" {
if strings.ToLower(bcsLogConf.Spec.WorkloadType) == strings.ToLower("Deployment") {
if strings.ToLower(bcsLogConf.Spec.WorkloadType) == strings.ToLower("Deployment") &&
strings.HasPrefix(pod.OwnerReferences[0].Name, bcsLogConf.Spec.WorkloadName){
score += 2
matched = true
}
Expand Down Expand Up @@ -233,17 +245,29 @@ func scoreBcsLogConfig(container *docker.Container, pod *corev1.Pod, bcsLogConf
//not matched, return 0 score
if len(bcsLogConf.Spec.ContainerConfs) != 0 && !matched {
blog.Warnf("container(%s:%s) pod(%s:%s) not match BcsLogConfig(%s) containerName",
container.ID, container.Config.Labels[ContainerLabelK8sContainerName], pod.Name, bcsLogConf.Name)
container.ID, container.Config.Labels[ContainerLabelK8sContainerName], pod.Name, pod.Name, bcsLogConf.Name)
return 0
}

return score
}

func (s *SidecarController) handleChangedBcsLogConfig(obj interface{}) {
conf, ok := obj.(*bcsv1.BcsLogConfig)
if !ok {
blog.Errorf("cannot convert to *bcsv1.BcsLogConfig: %v", obj)
return
}
blog.Infof("handle kubernetes AddOrDelete event BcsLogConfig(%s:%s)", conf.Name, conf.Namespace)
s.syncLogConfs()
}

func (s *SidecarController) handleUpdatedBcsLogConfig(oldObj, newObj interface{}) {
conf, ok := newObj.(*bcsv1.BcsLogConfig)
if !ok {
blog.Errorf("cannot convert to *bcsv1.BcsLogConfig: %v", newObj)
return
}
blog.Infof("handle kubernetes Update event BcsLogConfig(%s:%s)", conf.Name, conf.Namespace)
s.syncLogConfs()
}
1 change: 1 addition & 0 deletions docs/features/bcs-webhook-server/log-controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ spec:
appId: "20000"
workloadType: Deployment
workloadName: python-webhook
workloadNamespace: python-webhook
containerConfs:
- containerName: python
stdDataId: "2001"
Expand Down

0 comments on commit eb21565

Please sign in to comment.