Skip to content

Commit

Permalink
pgraph, resources: Major refactoring continued
Browse files Browse the repository at this point in the history
There was simply some technical debt I needed to kill off. Sorry for not
splitting this up into more patches.
  • Loading branch information
purpleidea committed May 29, 2017
1 parent 6eaf51c commit 2a5eaeb
Show file tree
Hide file tree
Showing 28 changed files with 749 additions and 699 deletions.
2 changes: 1 addition & 1 deletion docs/resource-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ This can _only_ be done inside of the `CheckApply` function!
```golang
// inside CheckApply, probably near the top
if val, exists := obj.Recv["SomeKey"]; exists {
log.Printf("SomeKey was sent to us from: %s[%s].%s", val.Res.GetKind(), val.Res.GetName(), val.Key)
log.Printf("SomeKey was sent to us from: %s.%s", val.Res, val.Key)
if val.Changed {
log.Printf("SomeKey was just updated!")
// you may want to invalidate some local cache
Expand Down
10 changes: 10 additions & 0 deletions examples/file0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
graph: mygraph
resources:
file:
- name: file0
path: "/tmp/mgmt/f1"
content: |
i am f0
state: exists
edges: []
7 changes: 7 additions & 0 deletions examples/noop0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
graph: mygraph
comment: simple noop example
resources:
noop:
- name: noop0
edges: []
120 changes: 60 additions & 60 deletions lib/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ func (obj *Main) Run() error {
// TODO: Import admin key
}

var G, oldGraph *pgraph.Graph
oldGraph := &pgraph.Graph{}
graph := &resources.MGraph{}
// pass in the information we need
graph.Debug = obj.Flags.Debug
graph.Init()

// exit after `max-runtime` seconds for no reason at all...
if i := obj.MaxRuntime; i > 0 {
Expand Down Expand Up @@ -367,6 +371,15 @@ func (obj *Main) Run() error {
EmbdEtcd: EmbdEtcd,
}

graph.Data = &resources.Data{
Hostname: hostname,
Converger: converger,
Prometheus: prom,
World: world,
Prefix: pgraphPrefix,
Debug: obj.Flags.Debug,
}

var gapiChan chan error // stream events are nil errors
if obj.GAPI != nil {
data := gapi.Data{
Expand Down Expand Up @@ -425,10 +438,10 @@ func (obj *Main) Run() error {
// we need the vertices to be paused to work on them, so
// run graph vertex LOCK...
if !first { // TODO: we can flatten this check out I think
converger.Pause() // FIXME: add sync wait?
resources.Pause(G, false) // sync
converger.Pause() // FIXME: add sync wait?
graph.Pause(false) // sync

//G.UnGroup() // FIXME: implement me if needed!
//graph.UnGroup() // FIXME: implement me if needed!
}

// make the graph from yaml, lib, puppet->yaml, or dsl!
Expand All @@ -437,23 +450,20 @@ func (obj *Main) Run() error {
log.Printf("Main: Error creating new graph: %v", err)
// unpause!
if !first {
resources.Start(G, first) // sync
converger.Start() // after G.Start()
graph.Start(first) // sync
converger.Start() // after Start()
}
continue
}
newGraph.SetValue("debug", obj.Flags.Debug)
// pass in the information we need
associateData(newGraph, &resources.Data{
Hostname: hostname,
Converger: converger,
Prometheus: prom,
World: world,
Prefix: pgraphPrefix,
Debug: obj.Flags.Debug,
})

for _, m := range graphMetas(newGraph) {
if obj.Flags.Debug {
log.Printf("Main: New Graph: %v", newGraph)
}

// this edits the paused vertices, but it is safe to do
// so even if we don't use this new graph, since those
// value should be the same for existing vertices...
for _, v := range newGraph.Vertices() {
m := resources.VtoR(v).Meta()
// apply the global noop parameter if requested
if obj.Noop {
m.Noop = obj.Noop
Expand All @@ -466,48 +476,59 @@ func (obj *Main) Run() error {
}
}

// FIXME: make sure we "UnGroup()" any semi-destructive
// changes to the resources so our efficient GraphSync
// will be able to re-use and cmp to the old graph.
// We don't have to "UnGroup()" to compare, since we
// save the old graph to use when we compare.
// TODO: Does this hurt performance or graph changes ?
log.Printf("Main: GraphSync...")
newFullGraph, err := resources.GraphSync(newGraph, oldGraph)
if err != nil {
vertexCmpFn := func(v1, v2 pgraph.Vertex) (bool, error) {
return resources.VtoR(v1).Compare(resources.VtoR(v2)), nil
}
vertexAddFn := func(v pgraph.Vertex) error {
err := resources.VtoR(v).Validate()
return errwrap.Wrapf(err, "could not Validate() resource")
}
vertexRemoveFn := func(v pgraph.Vertex) error {
// wait for exit before starting new graph!
resources.VtoR(v).Exit() // sync
return nil
}
// on success, this updates the receiver graph...
if err := oldGraph.GraphSync(newGraph, vertexCmpFn, vertexAddFn, vertexRemoveFn); err != nil {
log.Printf("Main: Error running graph sync: %v", err)
// unpause!
if !first {
resources.Start(G, first) // sync
converger.Start() // after Start(G)
graph.Start(first) // sync
converger.Start() // after Start()
}
continue
}
oldGraph = newFullGraph // save old graph
G = oldGraph.Copy() // copy to active graph
graph.Update(oldGraph) // copy in structure of new graph

resources.AutoEdges(G) // add autoedges; modifies the graph
resources.AutoGroup(G, &resources.NonReachabilityGrouper{}) // run autogroup; modifies the graph
resources.AutoEdges(graph.Graph) // add autoedges; modifies the graph
resources.AutoGroup(graph.Graph, &resources.NonReachabilityGrouper{}) // run autogroup; modifies the graph
// TODO: do we want to do a transitive reduction?
// FIXME: run a type checker that verifies all the send->recv relationships

// Call this here because at this point the graph does not
// know anything about the prometheus instance.
// Call this here because at this point the graph does
// not know anything about the prometheus instance.
if err := prom.UpdatePgraphStartTime(); err != nil {
log.Printf("Main: Prometheus.UpdatePgraphStartTime() errored: %v", err)
}
// Start(G) needs to be synchronous or wait,
// Start() needs to be synchronous or wait,
// because if half of the nodes are started and
// some are not ready yet and the EtcdWatch
// loops, we'll cause Pause(G) before we
// loops, we'll cause Pause() before we
// even got going, thus causing nil pointer errors
resources.Start(G, first) // sync
converger.Start() // after Start(G)
graph.Start(first) // sync
converger.Start() // after Start()

log.Printf("Main: Graph: %v", G) // show graph
log.Printf("Main: Graph: %v", graph) // show graph
if obj.Graphviz != "" {
filter := obj.GraphvizFilter
if filter == "" {
filter = "dot" // directed graph default
}
if err := G.ExecGraphviz(filter, obj.Graphviz, hostname); err != nil {
if err := graph.ExecGraphviz(filter, obj.Graphviz, hostname); err != nil {
log.Printf("Main: Graphviz: %v", err)
} else {
log.Printf("Main: Graphviz: Successfully generated graph!")
Expand Down Expand Up @@ -590,7 +611,7 @@ func (obj *Main) Run() error {
// tell inner main loop to exit
close(exitchan)

resources.Exit(G) // tells all the children to exit, and waits for them to do so
graph.Exit() // tells all the children to exit, and waits for them to do so

// cleanup etcd main loop last so it can process everything first
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
Expand All @@ -607,31 +628,10 @@ func (obj *Main) Run() error {
}

if obj.Flags.Debug {
log.Printf("Main: Graph: %v", G)
log.Printf("Main: Graph: %v", graph)
}

// TODO: wait for each vertex to exit...
log.Println("Goodbye!")
return reterr
}

// graphMetas returns a list of pointers to each of the resource MetaParams.
func graphMetas(g *pgraph.Graph) []*resources.MetaParams {
metas := []*resources.MetaParams{}
for _, v := range g.Vertices() { // loop through the vertices (resources)
res := resources.VtoR(v) // resource
meta := res.Meta()
metas = append(metas, meta)
}
return metas
}

// associateData associates some data with the object in the graph in question.
func associateData(g *pgraph.Graph, data *resources.Data) {
// prometheus needs to be associated to this graph as well
g.SetValue("prometheus", data.Prometheus)

for _, v := range g.Vertices() {
*resources.VtoR(v).Data() = *data
}
}
124 changes: 124 additions & 0 deletions pgraph/graphsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package pgraph

import (
"fmt"

errwrap "github.com/pkg/errors"
)

// GraphSync updates the Graph so that it matches the newGraph. It leaves
// identical elements alone so that they don't need to be refreshed.
// It tries to mutate existing elements into new ones, if they support this.
// This updates the Graph on success only.
// FIXME: should we do this with copies of the vertex resources?
// FIXME: add test cases
func (obj *Graph) GraphSync(newGraph *Graph, vertexCmpFn func(Vertex, Vertex) (bool, error), vertexAddFn func(Vertex) error, vertexRemoveFn func(Vertex) error) error {

oldGraph := obj.Copy() // work on a copy of the old graph
if oldGraph == nil {
var err error
oldGraph, err = NewGraph(newGraph.GetName()) // copy over the name
if err != nil {
return errwrap.Wrapf(err, "GraphSync failed")
}
}
oldGraph.SetName(newGraph.GetName()) // overwrite the name

var lookup = make(map[Vertex]Vertex)
var vertexKeep []Vertex // list of vertices which are the same in new graph
var edgeKeep []*Edge // list of vertices which are the same in new graph

for v := range newGraph.Adjacency() { // loop through the vertices (resources)
var vertex Vertex
// step one, direct compare with res.Compare
if vertex == nil { // redundant guard for consistency
fn := func(vv Vertex) (bool, error) {
b, err := vertexCmpFn(vv, v)
return b, errwrap.Wrapf(err, "vertexCmpFn failed")
}
var err error
vertex, err = oldGraph.VertexMatchFn(fn)
if err != nil {
return errwrap.Wrapf(err, "VertexMatchFn failed")
}
}

// TODO: consider adding a mutate API.
// step two, try and mutate with res.Mutate
//if vertex == nil { // not found yet...
// vertex = oldGraph.MutateMatch(res)
//}

if vertex == nil { // no match found yet
if err := vertexAddFn(v); err != nil {
return errwrap.Wrapf(err, "vertexAddFn failed")
}
vertex = v
oldGraph.AddVertex(vertex) // call standalone in case not part of an edge
}
lookup[v] = vertex // used for constructing edges
vertexKeep = append(vertexKeep, vertex) // append
}

// get rid of any vertices we shouldn't keep (that aren't in new graph)
for v := range oldGraph.Adjacency() {
if !VertexContains(v, vertexKeep) {
if err := vertexRemoveFn(v); err != nil {
return errwrap.Wrapf(err, "vertexRemoveFn failed")
}
oldGraph.DeleteVertex(v)
}
}

// compare edges
for v1 := range newGraph.Adjacency() { // loop through the vertices (resources)
for v2, e := range newGraph.Adjacency()[v1] {
// we have an edge!
// lookup vertices (these should exist now)
vertex1, exists1 := lookup[v1]
vertex2, exists2 := lookup[v2]
if !exists1 || !exists2 { // no match found, bug?
//if vertex1 == nil || vertex2 == nil { // no match found
return fmt.Errorf("new vertices weren't found") // programming error
}

edge, exists := oldGraph.Adjacency()[vertex1][vertex2]
if !exists || edge.Name != e.Name { // TODO: edgeCmp
edge = e // use or overwrite edge
}
oldGraph.Adjacency()[vertex1][vertex2] = edge // store it (AddEdge)
edgeKeep = append(edgeKeep, edge) // mark as saved
}
}

// delete unused edges
for v1 := range oldGraph.Adjacency() {
for _, e := range oldGraph.Adjacency()[v1] {
// we have an edge!
if !EdgeContains(e, edgeKeep) {
oldGraph.DeleteEdge(e)
}
}
}

// success
*obj = *oldGraph // save old graph
return nil
}
15 changes: 12 additions & 3 deletions pgraph/pgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ type Edge struct {

// Init initializes the graph which populates all the internal structures.
func (g *Graph) Init() error {
if g.Name == "" {
if g.Name == "" { // FIXME: is this really a good requirement?
return fmt.Errorf("can't initialize graph with empty name")
}

g.adjacency = make(map[Vertex]map[Vertex]*Edge)
g.kv = make(map[string]interface{})
//g.adjacency = make(map[Vertex]map[Vertex]*Edge) // not required
//g.kv = make(map[string]interface{}) // not required
return nil
}

Expand Down Expand Up @@ -106,11 +106,17 @@ func (g *Graph) Value(key string) (interface{}, bool) {

// SetValue sets a value to be stored alongside the graph in a particular key.
func (g *Graph) SetValue(key string, val interface{}) {
if g.kv == nil { // initialize on first use
g.kv = make(map[string]interface{})
}
g.kv[key] = val
}

// Copy makes a copy of the graph struct.
func (g *Graph) Copy() *Graph {
if g == nil { // allow nil graphs through
return g
}
newGraph := &Graph{
Name: g.Name,
adjacency: make(map[Vertex]map[Vertex]*Edge, len(g.adjacency)),
Expand All @@ -134,6 +140,9 @@ func (g *Graph) SetName(name string) {

// AddVertex uses variadic input to add all listed vertices to the graph.
func (g *Graph) AddVertex(xv ...Vertex) {
if g.adjacency == nil { // initialize on first use
g.adjacency = make(map[Vertex]map[Vertex]*Edge)
}
for _, v := range xv {
if _, exists := g.adjacency[v]; !exists {
g.adjacency[v] = make(map[Vertex]*Edge)
Expand Down
Loading

0 comments on commit 2a5eaeb

Please sign in to comment.