Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce probe cpu usage #470

Merged
merged 9 commits into from
Sep 11, 2015
13 changes: 7 additions & 6 deletions experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ func main() {
if err != nil {
log.Fatal(err)
}
rp := xfer.NewReportPublisher(publisher)

rand.Seed(time.Now().UnixNano())
for range time.Tick(*publishInterval) {
if err := publisher.Publish(demoReport(*hostCount)); err != nil {
if err := rp.Publish(demoReport(*hostCount)); err != nil {
log.Print(err)
}
}
Expand Down Expand Up @@ -84,14 +85,14 @@ func demoReport(nodeCount int) report.Report {
)

// Endpoint topology
r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
process.PID: "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
process.PID: "4000",
"name": c.dstProc,
"domain": "node-" + dst,
Expand All @@ -100,15 +101,15 @@ func demoReport(nodeCount int) report.Report {
}))

// Address topology
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
docker.Name: src,
}).WithAdjacent(dstAddressID))
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
docker.Name: dst,
}).WithAdjacent(srcAddressID))

// Host data
r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{
r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"host_name": "host-x",
"local_networks": localNet.String(),
Expand Down
3 changes: 2 additions & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func main() {
log.Fatal(err)
}

rp := xfer.NewReportPublisher(publisher)
for range time.Tick(*publishInterval) {
publisher.Publish(fixedReport)
rp.Publish(fixedReport)
}
}
10 changes: 5 additions & 5 deletions experimental/genreport/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func DemoReport(nodeCount int) report.Report {
)

// Endpoint topology
r.Endpoint = r.Endpoint.WithNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
r.Endpoint = r.Endpoint.AddNode(srcPortID, report.MakeNode().WithMetadata(map[string]string{
"pid": "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
r.Endpoint = r.Endpoint.WithNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithMetadata(map[string]string{
"pid": "4000",
"name": c.dstProc,
"domain": "node-" + dst,
Expand All @@ -80,15 +80,15 @@ func DemoReport(nodeCount int) report.Report {
}))

// Address topology
r.Address = r.Address.WithNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithMetadata(map[string]string{
"name": src,
}).WithAdjacent(dstAddressID))
r.Address = r.Address.WithNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{
r.Address = r.Address.AddNode(dstAddressID, report.MakeNode().WithMetadata(map[string]string{
"name": dst,
}).WithAdjacent(srcAddressID))

// Host data
r.Host = r.Host.WithNode("hostX", report.MakeNodeWith(map[string]string{
r.Host = r.Host.AddNode("hostX", report.MakeNodeWith(map[string]string{
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"host_name": "host-x",
"local_networks": localNet.String(),
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var ConntrackModulePresent = func() bool {

// NB this is not re-entrant!
func (c *Conntracker) run(args ...string) {
args = append([]string{"-E", "-o", "xml"}, args...)
args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
})
}

rpt.Address = rpt.Address.WithNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.WithNode(remoteAddressNodeID, remoteNode)
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
}

// Update endpoint topology
Expand Down Expand Up @@ -225,8 +225,8 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
}
rpt.Endpoint = rpt.Endpoint.WithNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.WithNode(remoteEndpointNodeID, remoteNode)
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
}
}

Expand Down
44 changes: 35 additions & 9 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func main() {
var (
endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
tickers = []Ticker{processCache}
reporters = []Reporter{
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
Expand Down Expand Up @@ -142,6 +143,7 @@ func main() {
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
}
Expand Down Expand Up @@ -173,31 +175,35 @@ func main() {
pubTick = time.Tick(*publishInterval)
spyTick = time.Tick(*spyInterval)
r = report.MakeReport()
p = xfer.NewReportPublisher(publishers)
)

for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
r.Window = *publishInterval
if err := publishers.Publish(r); err != nil {
if err := p.Publish(r); err != nil {
log.Printf("publish: %v", err)
}
r = report.MakeReport()

case <-spyTick:
if err := processCache.Update(); err != nil {
log.Printf("error reading processes: %v", err)
}
for _, reporter := range reporters {
newReport, err := reporter.Report()
if err != nil {
log.Printf("error generating report: %v", err)
start := time.Now()

for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
r = r.Merge(newReport)
}

r = r.Merge(doReport(reporters))
r = Apply(r, taggers)

if took := time.Since(start); took > *spyInterval {
log.Printf("report generation took too long (%s)", took)
}

case <-quit:
return
}
Expand All @@ -206,6 +212,26 @@ func main() {
log.Printf("%s", <-interrupt())
}

func doReport(reporters []Reporter) report.Report {
reports := make(chan report.Report, len(reporters))
for _, rep := range reporters {
go func(rep Reporter) {
newReport, err := rep.Report()
if err != nil {
log.Printf("error generating report: %v", err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}

result := report.MakeReport()
for i := 0; i < cap(reports); i++ {
result = result.Merge(<-reports)
}
return result
}

func interrupt() chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
Expand Down
82 changes: 44 additions & 38 deletions probe/overlay/weave.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"

"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/probe/docker"
Expand Down Expand Up @@ -43,6 +44,9 @@ var ipMatch = regexp.MustCompile(`([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3
type Weave struct {
url string
hostID string

mtx sync.RWMutex
status weaveStatus
}

type weaveStatus struct {
Expand Down Expand Up @@ -75,24 +79,32 @@ func NewWeave(hostID, weaveRouterAddress string) (*Weave, error) {
}, nil
}

func (w Weave) update() (weaveStatus, error) {
var result weaveStatus
// Tick implements Ticker
func (w *Weave) Tick() error {
req, err := http.NewRequest("GET", w.url, nil)
if err != nil {
return result, err
return err
}
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return result, err
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return result, fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
return fmt.Errorf("Weave Tagger: got %d", resp.StatusCode)
}

var result weaveStatus
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}

return result, json.NewDecoder(resp.Body).Decode(&result)
w.mtx.Lock()
defer w.mtx.Unlock()
w.status = result
return nil
}

type psEntry struct {
Expand All @@ -101,7 +113,7 @@ type psEntry struct {
ips []string
}

func (w Weave) ps() ([]psEntry, error) {
func (w *Weave) ps() ([]psEntry, error) {
var result []psEntry
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
Expand Down Expand Up @@ -132,30 +144,13 @@ func (w Weave) ps() ([]psEntry, error) {
return result, scanner.Err()
}

func (w Weave) tagContainer(r report.Report, containerIDPrefix, macAddress string, ips []string) {
for nodeid, nmd := range r.Container.Nodes {
idPrefix := nmd.Metadata[docker.ContainerID][:12]
if idPrefix != containerIDPrefix {
continue
}

existingIPs := report.MakeIDList(docker.ExtractContainerIPs(nmd)...)
existingIPs = existingIPs.Add(ips...)
nmd.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ")
nmd.Metadata[WeaveMACAddress] = macAddress
r.Container.Nodes[nodeid] = nmd
break
}
}

// Tag implements Tagger.
func (w Weave) Tag(r report.Report) (report.Report, error) {
status, err := w.update()
if err != nil {
return r, nil
}
func (w *Weave) Tag(r report.Report) (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()

for _, entry := range status.DNS.Entries {
// Put information from weaveDNS on the container nodes
for _, entry := range w.status.DNS.Entries {
if entry.Tombstone > 0 {
continue
}
Expand All @@ -167,28 +162,39 @@ func (w Weave) Tag(r report.Report) (report.Report, error) {
hostnames := report.IDList(strings.Fields(node.Metadata[WeaveDNSHostname]))
hostnames = hostnames.Add(strings.TrimSuffix(entry.Hostname, "."))
node.Metadata[WeaveDNSHostname] = strings.Join(hostnames, " ")
r.Container.Nodes[nodeID] = node
}

// Put information from weave ps on the container nodes
psEntries, err := w.ps()
if err != nil {
return r, nil
}
containersByPrefix := map[string]report.Node{}
for _, node := range r.Container.Nodes {
prefix := node.Metadata[docker.ContainerID][:12]
containersByPrefix[prefix] = node
}
for _, e := range psEntries {
w.tagContainer(r, e.containerIDPrefix, e.macAddress, e.ips)
node, ok := containersByPrefix[e.containerIDPrefix]
if !ok {
continue
}

existingIPs := report.MakeIDList(docker.ExtractContainerIPs(node)...)
existingIPs = existingIPs.Add(e.ips...)
node.Metadata[docker.ContainerIPs] = strings.Join(existingIPs, " ")
node.Metadata[WeaveMACAddress] = e.macAddress
}
return r, nil
}

// Report implements Reporter.
func (w Weave) Report() (report.Report, error) {
r := report.MakeReport()
status, err := w.update()
if err != nil {
return r, err
}
func (w *Weave) Report() (report.Report, error) {
w.mtx.RLock()
defer w.mtx.RUnlock()

for _, peer := range status.Router.Peers {
r := report.MakeReport()
for _, peer := range w.status.Router.Peers {
r.Overlay.Nodes[report.MakeOverlayNodeID(peer.Name)] = report.MakeNodeWith(map[string]string{
WeavePeerName: peer.Name,
WeavePeerNickName: peer.NickName,
Expand Down
2 changes: 2 additions & 0 deletions probe/overlay/weave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
t.Fatal(err)
}

w.Tick()

{
have, err := w.Report()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions probe/process/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (c *CachingWalker) Walk(f func(Process)) error {
return nil
}

// Update updates cached copy of process list
func (c *CachingWalker) Update() error {
// Tick updates cached copy of process list
func (c *CachingWalker) Tick() error {
newCache := []Process{}
err := c.source.Walk(func(p Process) {
newCache = append(newCache, p)
Expand Down
Loading