Skip to content

Commit

Permalink
feature: Mesos方案支持prometheus ServiceMonitor TencentBlueKing#514
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Jul 30, 2020
1 parent f21c482 commit d09ded7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,31 @@ package discovery
import (
"fmt"
"path"
"time"
"sync"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-mesos/pkg/client/informers"
apisbkbcsv2 "github.com/Tencent/bk-bcs/bcs-mesos/pkg/apis/bkbcs/v2"
"github.com/Tencent/bk-bcs/bcs-mesos/pkg/client/internalclientset"
bkbcsv2 "github.com/Tencent/bk-bcs/bcs-mesos/pkg/client/lister/bkbcs/v2"
"github.com/Tencent/bk-bcs/bcs-services/bcs-service-prometheus/types"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/cache"
)

type nodeEtcdDiscovery struct {
sync.RWMutex
kubeconfig string
sdFilePath string
cadvisorPort int
nodeExportPort int
module string

eventHandler EventHandleFunc
nodeLister bkbcsv2.AgentLister
nodeInformer cache.SharedIndexInformer
initSuccess bool
promFilePrefix string
nodes map[string]struct{}
}

// new nodeEtcdDiscovery for discovery node cadvisor targets
Expand All @@ -49,6 +51,7 @@ func NewNodeEtcdDiscovery(kubeconfig string, promFilePrefix, module string, cadv
cadvisorPort: cadvisorPort,
nodeExportPort: nodeExportPort,
module: module,
nodes: make(map[string]struct{}),
}
switch module {
case CadvisorModule:
Expand Down Expand Up @@ -78,36 +81,30 @@ func (disc *nodeEtcdDiscovery) Start() error {
return err
}
internalFactory := informers.NewSharedInformerFactory(internalClientset, 0)
disc.nodeLister = internalFactory.Bkbcs().V2().Agents().Lister()
disc.nodeInformer = internalFactory.Bkbcs().V2().Agents().Informer()
internalFactory.Start(stopCh)
// Wait for all caches to sync.
internalFactory.WaitForCacheSync(stopCh)
blog.Infof("build internalClientset for config %s success", disc.kubeconfig)

go disc.syncTickerPromSdConfig()
disc.initSuccess = true
disc.eventHandler(DiscoveryInfo{Module: disc.module, Key: disc.module})
disc.nodeInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: disc.OnAdd,
UpdateFunc: disc.OnUpdate,
DeleteFunc: disc.OnDelete,
},
)
return nil
}

func (disc *nodeEtcdDiscovery) GetPrometheusSdConfig(module string) ([]*types.PrometheusSdConfig, error) {
nodes, err := disc.nodeLister.List(labels.Everything())
if err != nil {
return nil, err
}

disc.Lock()
disc.Unlock()
promConfigs := make([]*types.PrometheusSdConfig, 0)
for _, node := range nodes {
ip := node.Spec.GetAgentIP()
if ip == "" {
blog.Errorf("discovery %s node %s not found InnerIP", disc.module, node.GetName())
continue
}

for nodeIp, _ := range disc.nodes {
switch disc.module {
case CadvisorModule:
conf := &types.PrometheusSdConfig{
Targets: []string{fmt.Sprintf("%s:%d", ip, disc.cadvisorPort)},
Targets: []string{fmt.Sprintf("%s:%d", nodeIp, disc.cadvisorPort)},
Labels: map[string]string{
DefaultBcsModuleLabelKey: disc.module,
},
Expand All @@ -117,7 +114,7 @@ func (disc *nodeEtcdDiscovery) GetPrometheusSdConfig(module string) ([]*types.Pr

case NodeexportModule:
conf := &types.PrometheusSdConfig{
Targets: []string{fmt.Sprintf("%s:%d", ip, disc.nodeExportPort)},
Targets: []string{fmt.Sprintf("%s:%d", nodeIp, disc.nodeExportPort)},
Labels: map[string]string{
DefaultBcsModuleLabelKey: disc.module,
},
Expand All @@ -139,35 +136,57 @@ func (disc *nodeEtcdDiscovery) RegisterEventFunc(handleFunc EventHandleFunc) {
}

func (disc *nodeEtcdDiscovery) OnAdd(obj interface{}) {
if !disc.initSuccess {
disc.Lock()
defer disc.Unlock()

agent, ok := obj.(*apisbkbcsv2.Agent)
if !ok {
blog.Errorf("cannot convert to *apisbkbcsv2.Agent: %v", obj)
return
}
blog.Infof("recieve Agent(%s) Add event", agent.Name)
ip := agent.Spec.GetAgentIP()
if ip == "" {
blog.Errorf("node %s not found InnerIP", agent.GetName())
return
}
disc.nodes[ip] = struct{}{}

disc.eventHandler(DiscoveryInfo{Module: disc.module, Key: disc.module})
}

// if on update event, then don't need to update sd config
func (disc *nodeEtcdDiscovery) OnUpdate(old, cur interface{}) {
if !disc.initSuccess {
return
}
//do nothing
}

func (disc *nodeEtcdDiscovery) OnDelete(obj interface{}) {
if !disc.initSuccess {
disc.Lock()
defer disc.Unlock()

agent, ok := obj.(*apisbkbcsv2.Agent)
if !ok {
blog.Errorf("cannot convert to *apisbkbcsv2.Agent: %v", obj)
return
}
blog.Infof("recieve Agent(%s) Delete event", agent.Name)
ip := agent.Spec.GetAgentIP()
if ip == "" {
blog.Errorf("node %s not found InnerIP", agent.GetName())
return
}
delete(disc.nodes, ip)

// call event handler
disc.eventHandler(DiscoveryInfo{Module: disc.module, Key: disc.module})
}

func (disc *nodeEtcdDiscovery) syncTickerPromSdConfig() {
/*func (disc *nodeEtcdDiscovery) syncTickerPromSdConfig() {
ticker := time.NewTicker(time.Minute * 5)
select {
case <-ticker.C:
blog.V(3).Infof("ticker sync prometheus service discovery config")
disc.eventHandler(DiscoveryInfo{Module: disc.module, Key: disc.module})
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ func (disc *serviceMonitor) Start() error {
return err
}
internalFactory := informers.NewSharedInformerFactory(internalClientset, 0)
disc.endpointLister = internalFactory.Bkbcs().V2().BcsEndpoints().Lister()
disc.endpointInformer = internalFactory.Bkbcs().V2().BcsEndpoints().Informer()
blog.Infof("build bkbcsClientset for config %s success", disc.kubeconfig)

//init monitor clientset
disc.serviceMonitorLister = internalFactory.Monitor().V1().ServiceMonitors().Lister()
disc.serviceMonitorInformer = internalFactory.Monitor().V1().ServiceMonitors().Informer()
internalFactory.Start(stopCh)
// Wait for all caches to sync.
Expand Down

0 comments on commit d09ded7

Please sign in to comment.