Skip to content

Commit

Permalink
Merge pull request #404 from weaveworks/scope-364
Browse files Browse the repository at this point in the history
reverse resolutions for remote side of connections.
  • Loading branch information
inercia committed Sep 7, 2015
2 parents 9526663 + 7513b4e commit d3c7138
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 26 deletions.
25 changes: 23 additions & 2 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Reporter struct {
includeNAT bool
conntracker *Conntracker
natmapper *natmapper
revResolver *ReverseResolver
}

// SpyDuration is an exported prometheus metric
Expand Down Expand Up @@ -70,6 +71,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
includeProcesses: includeProcesses,
conntracker: conntracker,
natmapper: natmapper,
revResolver: NewReverseResolver(),
}
}

Expand All @@ -81,6 +83,7 @@ func (r *Reporter) Stop() {
if r.natmapper != nil {
r.natmapper.Stop()
}
r.revResolver.Stop()
}

// Report implements Reporter.
Expand Down Expand Up @@ -145,8 +148,17 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
"name": revRemoteName,
})
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any counting here; the merge does it for us.
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
Expand Down Expand Up @@ -177,8 +189,17 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
"name": revRemoteName,
})
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any counting here; the merge does it for us.
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
LocalAddress: fixLocalAddress,
LocalPort: fixLocalPort,
RemoteAddress: fixRemoteAddress,
RemotePort: fixRemotePortB,
RemotePort: fixRemotePort,
Proc: procspy.Proc{
PID: fixProcessPID,
Name: fixProcessName,
Expand Down
76 changes: 76 additions & 0 deletions probe/endpoint/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package endpoint

import (
"net"
"strings"
"time"

"github.com/bluele/gcache"
)

const (
rAddrCacheLen = 500 // Default cache length
rAddrBacklog = 1000
rAddrCacheExpiration = 30 * time.Minute
)

type revResFunc func(addr string) (names []string, err error)

// ReverseResolver is a caching, reverse resolver.
type ReverseResolver struct {
addresses chan string
cache gcache.Cache
Throttle <-chan time.Time // Made public for mocking
Resolver revResFunc
}

// NewReverseResolver starts a new reverse resolver that performs reverse
// resolutions and caches the result.
func NewReverseResolver() *ReverseResolver {
r := ReverseResolver{
addresses: make(chan string, rAddrBacklog),
cache: gcache.New(rAddrCacheLen).LRU().Expiration(rAddrCacheExpiration).Build(),
Throttle: time.Tick(time.Second / 10),
Resolver: net.LookupAddr,
}
go r.loop()
return &r
}

// Get the reverse resolution for an IP address if already in the cache, a
// gcache.NotFoundKeyError error otherwise. Note: it returns one of the
// possible names that can be obtained for that IP.
func (r *ReverseResolver) Get(address string) (string, error) {
val, err := r.cache.Get(address)
if err == nil {
return val.(string), nil
}
if err == gcache.NotFoundKeyError {
// We trigger a asynchronous reverse resolution when not cached
select {
case r.addresses <- address:
default:
}
}
return "", err
}

func (r *ReverseResolver) loop() {
for request := range r.addresses {
<-r.Throttle // rate limit our DNS resolutions
// and check if the answer is already in the cache
if _, err := r.cache.Get(request); err == nil {
continue
}
names, err := r.Resolver(request)
if err == nil && len(names) > 0 {
name := strings.TrimRight(names[0], ".")
r.cache.Set(request, name)
}
}
}

// Stop the async reverse resolver.
func (r *ReverseResolver) Stop() {
close(r.addresses)
}
38 changes: 38 additions & 0 deletions probe/endpoint/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package endpoint_test

import (
"errors"
"testing"
"time"

. "github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/test"
)

func TestReverseResolver(t *testing.T) {
tests := map[string]string{
"1.2.3.4": "test.domain.name",
"4.3.2.1": "im.a.little.tea.pot",
}

revRes := NewReverseResolver()
defer revRes.Stop()

// Use a mocked resolver function.
revRes.Resolver = func(addr string) (names []string, err error) {
if name, ok := tests[addr]; ok {
return []string{name}, nil
}
return []string{}, errors.New("invalid IP")
}

// Up the rate limit so the test runs faster.
revRes.Throttle = time.Tick(time.Millisecond)

for ip, hostname := range tests {
test.Poll(t, 100*time.Millisecond, hostname, func() interface{} {
result, _ := revRes.Get(ip)
return result
})
}
}
11 changes: 7 additions & 4 deletions render/detailed_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,25 @@ func OriginTable(r report.Report, originID string, addHostTags bool, addContaine

func connectionDetailsRows(topology report.Topology, originID string) []Row {
rows := []Row{}
labeler := func(nodeID string) (string, bool) {
labeler := func(nodeID string, meta map[string]string) (string, bool) {
if _, addr, port, ok := report.ParseEndpointNodeID(nodeID); ok {
if name, ok := meta["name"]; ok {
return fmt.Sprintf("%s:%s", name, port), true
}
return fmt.Sprintf("%s:%s", addr, port), true
}
if _, addr, ok := report.ParseAddressNodeID(nodeID); ok {
return addr, true
}
return "", false
}
local, ok := labeler(originID)
local, ok := labeler(originID, topology.Nodes[originID].Metadata)
if !ok {
return rows
}
// Firstly, collection outgoing connections from this node.
for _, serverNodeID := range topology.Nodes[originID].Adjacency {
remote, ok := labeler(serverNodeID)
remote, ok := labeler(serverNodeID, topology.Nodes[serverNodeID].Metadata)
if !ok {
continue
}
Expand All @@ -239,7 +242,7 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row {
if !serverNodeIDs.Contains(originID) {
continue
}
remote, ok := labeler(clientNodeID)
remote, ok := labeler(clientNodeID, clientNode.Metadata)
if !ok {
continue
}
Expand Down
47 changes: 28 additions & 19 deletions report/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
)

// Topology describes a specific view of a network. It consists of nodes and
// edges, and metadata about those nodes and edges, represented by EdgeMetadatas
// and Nodes respectively. Edges are directional, and embedded in the
// Node struct.
// edges, and metadata about those nodes and edges, represented by
// EdgeMetadatas and Nodes respectively. Edges are directional, and embedded
// in the Node struct.
type Topology struct {
Nodes
}
Expand All @@ -20,8 +20,9 @@ func MakeTopology() Topology {
}
}

// WithNode produces a topology from t, with nmd added under key nodeID; if a node already exists
// for this key, nmd is merged with that node. NB A fresh topology is returned.
// WithNode produces a topology from t, with nmd added under key nodeID; if a
// node already exists for this key, nmd is merged with that node. Note that a
// fresh topology is returned.
func (t Topology) WithNode(nodeID string, nmd Node) Topology {
if existing, ok := t.Nodes[nodeID]; ok {
nmd = nmd.Merge(existing)
Expand Down Expand Up @@ -70,9 +71,9 @@ func (n Nodes) Merge(other Nodes) Nodes {
return cp
}

// Node describes a superset of the metadata that probes can collect
// about a given node in a given topology, along with the edges emanating
// from the node and metadata about those edges.
// Node describes a superset of the metadata that probes can collect about a
// given node in a given topology, along with the edges emanating from the
// node and metadata about those edges.
type Node struct {
Metadata `json:"-"`
Counters `json:"-"`
Expand Down Expand Up @@ -102,14 +103,21 @@ func (n Node) WithMetadata(m map[string]string) Node {
return result
}

// WithCounters returns a fresh copy of n, with Counters set to c
// AddMetadata returns a fresh copy of n, with Metadata set to the merge of n
// and the metadata provided.
func (n Node) AddMetadata(m map[string]string) Node {
additional := MakeNodeWith(m)
return n.Merge(additional)
}

// WithCounters returns a fresh copy of n, with Counters set to c.
func (n Node) WithCounters(c map[string]int) Node {
result := n.Copy()
result.Counters = c
return result
}

// WithAdjacency returns a fresh copy of n, with Adjacency set to a
// WithAdjacency returns a fresh copy of n, with Adjacency set to a.
func (n Node) WithAdjacency(a IDList) Node {
result := n.Copy()
result.Adjacency = a
Expand All @@ -123,7 +131,8 @@ func (n Node) WithAdjacent(a string) Node {
return result
}

// WithEdge returns a fresh copy of n, with 'dst' added to Adjacency and md added to EdgeMetadata
// WithEdge returns a fresh copy of n, with 'dst' added to Adjacency and md
// added to EdgeMetadata.
func (n Node) WithEdge(dst string, md EdgeMetadata) Node {
result := n.Copy()
result.Adjacency = result.Adjacency.Add(dst)
Expand Down Expand Up @@ -152,7 +161,7 @@ func (n Node) Merge(other Node) Node {
return cp
}

// Metadata is a string->string map
// Metadata is a string->string map.
type Metadata map[string]string

// Merge merges two node metadata maps together. In case of conflict, the
Expand All @@ -166,7 +175,7 @@ func (m Metadata) Merge(other Metadata) Metadata {
return result
}

// Copy creates a deep copy of the Metadata
// Copy creates a deep copy of the Metadata.
func (m Metadata) Copy() Metadata {
result := Metadata{}
for k, v := range m {
Expand All @@ -175,11 +184,11 @@ func (m Metadata) Copy() Metadata {
return result
}

// Counters is a string->int map
// Counters is a string->int map.
type Counters map[string]int

// Merge merges two sets of counters into a fresh set of counters,
// summing values where appropriate
// Merge merges two sets of counters into a fresh set of counters, summing
// values where appropriate.
func (c Counters) Merge(other Counters) Counters {
result := c.Copy()
for k, v := range other {
Expand All @@ -188,7 +197,7 @@ func (c Counters) Merge(other Counters) Counters {
return result
}

// Copy creates a deep copy of the Counters
// Copy creates a deep copy of the Counters.
func (c Counters) Copy() Counters {
result := Counters{}
for k, v := range c {
Expand All @@ -197,8 +206,8 @@ func (c Counters) Copy() Counters {
return result
}

// EdgeMetadatas collect metadata about each edge in a topology. Keys are
// the remote node IDs, as in Adjacency.
// EdgeMetadatas collect metadata about each edge in a topology. Keys are the
// remote node IDs, as in Adjacency.
type EdgeMetadatas map[string]EdgeMetadata

// Copy returns a value copy of the EdgeMetadatas.
Expand Down

0 comments on commit d3c7138

Please sign in to comment.