From 2a5eaeb1c9726527a9829bf6efdf2e6a6c811517 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sat, 27 May 2017 19:46:57 -0400 Subject: [PATCH] pgraph, resources: Major refactoring continued There was simply some technical debt I needed to kill off. Sorry for not splitting this up into more patches. --- docs/resource-guide.md | 2 +- examples/file0.yaml | 10 + examples/noop0.yaml | 7 + lib/main.go | 120 ++++++------ pgraph/graphsync.go | 124 ++++++++++++ pgraph/pgraph.go | 15 +- pgraph/pgraph_test.go | 22 ++- resources/actions.go | 427 ++++++++++++++--------------------------- resources/augeas.go | 10 +- resources/autoedge.go | 2 +- resources/autogroup.go | 15 +- resources/exec.go | 8 +- resources/file.go | 12 +- resources/kv.go | 6 +- resources/mgraph.go | 210 ++++++++++++++++++++ resources/msg.go | 2 +- resources/noop.go | 2 +- resources/nspawn.go | 16 +- resources/password.go | 8 +- resources/pgraph.go | 189 ------------------ resources/pkg.go | 4 +- resources/resources.go | 138 ++++++------- resources/sendrecv.go | 21 +- resources/svc.go | 10 +- resources/timer.go | 2 +- resources/virt.go | 58 +++--- yamlgraph/gconfig.go | 4 +- yamlgraph2/gconfig.go | 4 +- 28 files changed, 749 insertions(+), 699 deletions(-) create mode 100644 examples/file0.yaml create mode 100644 examples/noop0.yaml create mode 100644 pgraph/graphsync.go create mode 100644 resources/mgraph.go delete mode 100644 resources/pgraph.go diff --git a/docs/resource-guide.md b/docs/resource-guide.md index f6d6fc80b7..11fc245b26 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -516,7 +516,7 @@ This can _only_ be done inside of the `CheckApply` function! ```golang // inside CheckApply, probably near the top if val, exists := obj.Recv["SomeKey"]; exists { - log.Printf("SomeKey was sent to us from: %s[%s].%s", val.Res.GetKind(), val.Res.GetName(), val.Key) + log.Printf("SomeKey was sent to us from: %s.%s", val.Res, val.Key) if val.Changed { log.Printf("SomeKey was just updated!") // you may want to invalidate some local cache diff --git a/examples/file0.yaml b/examples/file0.yaml new file mode 100644 index 0000000000..05d3777bea --- /dev/null +++ b/examples/file0.yaml @@ -0,0 +1,10 @@ +--- +graph: mygraph +resources: + file: + - name: file0 + path: "/tmp/mgmt/f1" + content: | + i am f0 + state: exists +edges: [] diff --git a/examples/noop0.yaml b/examples/noop0.yaml new file mode 100644 index 0000000000..912a008140 --- /dev/null +++ b/examples/noop0.yaml @@ -0,0 +1,7 @@ +--- +graph: mygraph +comment: simple noop example +resources: + noop: + - name: noop0 +edges: [] diff --git a/lib/main.go b/lib/main.go index ede395c1aa..a8393f4f01 100644 --- a/lib/main.go +++ b/lib/main.go @@ -298,7 +298,11 @@ func (obj *Main) Run() error { // TODO: Import admin key } - var G, oldGraph *pgraph.Graph + oldGraph := &pgraph.Graph{} + graph := &resources.MGraph{} + // pass in the information we need + graph.Debug = obj.Flags.Debug + graph.Init() // exit after `max-runtime` seconds for no reason at all... if i := obj.MaxRuntime; i > 0 { @@ -367,6 +371,15 @@ func (obj *Main) Run() error { EmbdEtcd: EmbdEtcd, } + graph.Data = &resources.Data{ + Hostname: hostname, + Converger: converger, + Prometheus: prom, + World: world, + Prefix: pgraphPrefix, + Debug: obj.Flags.Debug, + } + var gapiChan chan error // stream events are nil errors if obj.GAPI != nil { data := gapi.Data{ @@ -425,10 +438,10 @@ func (obj *Main) Run() error { // we need the vertices to be paused to work on them, so // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think - converger.Pause() // FIXME: add sync wait? - resources.Pause(G, false) // sync + converger.Pause() // FIXME: add sync wait? + graph.Pause(false) // sync - //G.UnGroup() // FIXME: implement me if needed! + //graph.UnGroup() // FIXME: implement me if needed! } // make the graph from yaml, lib, puppet->yaml, or dsl! @@ -437,23 +450,20 @@ func (obj *Main) Run() error { log.Printf("Main: Error creating new graph: %v", err) // unpause! if !first { - resources.Start(G, first) // sync - converger.Start() // after G.Start() + graph.Start(first) // sync + converger.Start() // after Start() } continue } - newGraph.SetValue("debug", obj.Flags.Debug) - // pass in the information we need - associateData(newGraph, &resources.Data{ - Hostname: hostname, - Converger: converger, - Prometheus: prom, - World: world, - Prefix: pgraphPrefix, - Debug: obj.Flags.Debug, - }) - - for _, m := range graphMetas(newGraph) { + if obj.Flags.Debug { + log.Printf("Main: New Graph: %v", newGraph) + } + + // this edits the paused vertices, but it is safe to do + // so even if we don't use this new graph, since those + // value should be the same for existing vertices... + for _, v := range newGraph.Vertices() { + m := resources.VtoR(v).Meta() // apply the global noop parameter if requested if obj.Noop { m.Noop = obj.Noop @@ -466,48 +476,59 @@ func (obj *Main) Run() error { } } - // FIXME: make sure we "UnGroup()" any semi-destructive - // changes to the resources so our efficient GraphSync - // will be able to re-use and cmp to the old graph. + // We don't have to "UnGroup()" to compare, since we + // save the old graph to use when we compare. + // TODO: Does this hurt performance or graph changes ? log.Printf("Main: GraphSync...") - newFullGraph, err := resources.GraphSync(newGraph, oldGraph) - if err != nil { + vertexCmpFn := func(v1, v2 pgraph.Vertex) (bool, error) { + return resources.VtoR(v1).Compare(resources.VtoR(v2)), nil + } + vertexAddFn := func(v pgraph.Vertex) error { + err := resources.VtoR(v).Validate() + return errwrap.Wrapf(err, "could not Validate() resource") + } + vertexRemoveFn := func(v pgraph.Vertex) error { + // wait for exit before starting new graph! + resources.VtoR(v).Exit() // sync + return nil + } + // on success, this updates the receiver graph... + if err := oldGraph.GraphSync(newGraph, vertexCmpFn, vertexAddFn, vertexRemoveFn); err != nil { log.Printf("Main: Error running graph sync: %v", err) // unpause! if !first { - resources.Start(G, first) // sync - converger.Start() // after Start(G) + graph.Start(first) // sync + converger.Start() // after Start() } continue } - oldGraph = newFullGraph // save old graph - G = oldGraph.Copy() // copy to active graph + graph.Update(oldGraph) // copy in structure of new graph - resources.AutoEdges(G) // add autoedges; modifies the graph - resources.AutoGroup(G, &resources.NonReachabilityGrouper{}) // run autogroup; modifies the graph + resources.AutoEdges(graph.Graph) // add autoedges; modifies the graph + resources.AutoGroup(graph.Graph, &resources.NonReachabilityGrouper{}) // run autogroup; modifies the graph // TODO: do we want to do a transitive reduction? // FIXME: run a type checker that verifies all the send->recv relationships - // Call this here because at this point the graph does not - // know anything about the prometheus instance. + // Call this here because at this point the graph does + // not know anything about the prometheus instance. if err := prom.UpdatePgraphStartTime(); err != nil { log.Printf("Main: Prometheus.UpdatePgraphStartTime() errored: %v", err) } - // Start(G) needs to be synchronous or wait, + // Start() needs to be synchronous or wait, // because if half of the nodes are started and // some are not ready yet and the EtcdWatch - // loops, we'll cause Pause(G) before we + // loops, we'll cause Pause() before we // even got going, thus causing nil pointer errors - resources.Start(G, first) // sync - converger.Start() // after Start(G) + graph.Start(first) // sync + converger.Start() // after Start() - log.Printf("Main: Graph: %v", G) // show graph + log.Printf("Main: Graph: %v", graph) // show graph if obj.Graphviz != "" { filter := obj.GraphvizFilter if filter == "" { filter = "dot" // directed graph default } - if err := G.ExecGraphviz(filter, obj.Graphviz, hostname); err != nil { + if err := graph.ExecGraphviz(filter, obj.Graphviz, hostname); err != nil { log.Printf("Main: Graphviz: %v", err) } else { log.Printf("Main: Graphviz: Successfully generated graph!") @@ -590,7 +611,7 @@ func (obj *Main) Run() error { // tell inner main loop to exit close(exitchan) - resources.Exit(G) // tells all the children to exit, and waits for them to do so + graph.Exit() // tells all the children to exit, and waits for them to do so // cleanup etcd main loop last so it can process everything first if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd @@ -607,31 +628,10 @@ func (obj *Main) Run() error { } if obj.Flags.Debug { - log.Printf("Main: Graph: %v", G) + log.Printf("Main: Graph: %v", graph) } // TODO: wait for each vertex to exit... log.Println("Goodbye!") return reterr } - -// graphMetas returns a list of pointers to each of the resource MetaParams. -func graphMetas(g *pgraph.Graph) []*resources.MetaParams { - metas := []*resources.MetaParams{} - for _, v := range g.Vertices() { // loop through the vertices (resources) - res := resources.VtoR(v) // resource - meta := res.Meta() - metas = append(metas, meta) - } - return metas -} - -// associateData associates some data with the object in the graph in question. -func associateData(g *pgraph.Graph, data *resources.Data) { - // prometheus needs to be associated to this graph as well - g.SetValue("prometheus", data.Prometheus) - - for _, v := range g.Vertices() { - *resources.VtoR(v).Data() = *data - } -} diff --git a/pgraph/graphsync.go b/pgraph/graphsync.go new file mode 100644 index 0000000000..bd30fd115f --- /dev/null +++ b/pgraph/graphsync.go @@ -0,0 +1,124 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgraph + +import ( + "fmt" + + errwrap "github.com/pkg/errors" +) + +// GraphSync updates the Graph so that it matches the newGraph. It leaves +// identical elements alone so that they don't need to be refreshed. +// It tries to mutate existing elements into new ones, if they support this. +// This updates the Graph on success only. +// FIXME: should we do this with copies of the vertex resources? +// FIXME: add test cases +func (obj *Graph) GraphSync(newGraph *Graph, vertexCmpFn func(Vertex, Vertex) (bool, error), vertexAddFn func(Vertex) error, vertexRemoveFn func(Vertex) error) error { + + oldGraph := obj.Copy() // work on a copy of the old graph + if oldGraph == nil { + var err error + oldGraph, err = NewGraph(newGraph.GetName()) // copy over the name + if err != nil { + return errwrap.Wrapf(err, "GraphSync failed") + } + } + oldGraph.SetName(newGraph.GetName()) // overwrite the name + + var lookup = make(map[Vertex]Vertex) + var vertexKeep []Vertex // list of vertices which are the same in new graph + var edgeKeep []*Edge // list of vertices which are the same in new graph + + for v := range newGraph.Adjacency() { // loop through the vertices (resources) + var vertex Vertex + // step one, direct compare with res.Compare + if vertex == nil { // redundant guard for consistency + fn := func(vv Vertex) (bool, error) { + b, err := vertexCmpFn(vv, v) + return b, errwrap.Wrapf(err, "vertexCmpFn failed") + } + var err error + vertex, err = oldGraph.VertexMatchFn(fn) + if err != nil { + return errwrap.Wrapf(err, "VertexMatchFn failed") + } + } + + // TODO: consider adding a mutate API. + // step two, try and mutate with res.Mutate + //if vertex == nil { // not found yet... + // vertex = oldGraph.MutateMatch(res) + //} + + if vertex == nil { // no match found yet + if err := vertexAddFn(v); err != nil { + return errwrap.Wrapf(err, "vertexAddFn failed") + } + vertex = v + oldGraph.AddVertex(vertex) // call standalone in case not part of an edge + } + lookup[v] = vertex // used for constructing edges + vertexKeep = append(vertexKeep, vertex) // append + } + + // get rid of any vertices we shouldn't keep (that aren't in new graph) + for v := range oldGraph.Adjacency() { + if !VertexContains(v, vertexKeep) { + if err := vertexRemoveFn(v); err != nil { + return errwrap.Wrapf(err, "vertexRemoveFn failed") + } + oldGraph.DeleteVertex(v) + } + } + + // compare edges + for v1 := range newGraph.Adjacency() { // loop through the vertices (resources) + for v2, e := range newGraph.Adjacency()[v1] { + // we have an edge! + // lookup vertices (these should exist now) + vertex1, exists1 := lookup[v1] + vertex2, exists2 := lookup[v2] + if !exists1 || !exists2 { // no match found, bug? + //if vertex1 == nil || vertex2 == nil { // no match found + return fmt.Errorf("new vertices weren't found") // programming error + } + + edge, exists := oldGraph.Adjacency()[vertex1][vertex2] + if !exists || edge.Name != e.Name { // TODO: edgeCmp + edge = e // use or overwrite edge + } + oldGraph.Adjacency()[vertex1][vertex2] = edge // store it (AddEdge) + edgeKeep = append(edgeKeep, edge) // mark as saved + } + } + + // delete unused edges + for v1 := range oldGraph.Adjacency() { + for _, e := range oldGraph.Adjacency()[v1] { + // we have an edge! + if !EdgeContains(e, edgeKeep) { + oldGraph.DeleteEdge(e) + } + } + } + + // success + *obj = *oldGraph // save old graph + return nil +} diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 431fb52376..acb21ba120 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -54,12 +54,12 @@ type Edge struct { // Init initializes the graph which populates all the internal structures. func (g *Graph) Init() error { - if g.Name == "" { + if g.Name == "" { // FIXME: is this really a good requirement? return fmt.Errorf("can't initialize graph with empty name") } - g.adjacency = make(map[Vertex]map[Vertex]*Edge) - g.kv = make(map[string]interface{}) + //g.adjacency = make(map[Vertex]map[Vertex]*Edge) // not required + //g.kv = make(map[string]interface{}) // not required return nil } @@ -106,11 +106,17 @@ func (g *Graph) Value(key string) (interface{}, bool) { // SetValue sets a value to be stored alongside the graph in a particular key. func (g *Graph) SetValue(key string, val interface{}) { + if g.kv == nil { // initialize on first use + g.kv = make(map[string]interface{}) + } g.kv[key] = val } // Copy makes a copy of the graph struct. func (g *Graph) Copy() *Graph { + if g == nil { // allow nil graphs through + return g + } newGraph := &Graph{ Name: g.Name, adjacency: make(map[Vertex]map[Vertex]*Edge, len(g.adjacency)), @@ -134,6 +140,9 @@ func (g *Graph) SetName(name string) { // AddVertex uses variadic input to add all listed vertices to the graph. func (g *Graph) AddVertex(xv ...Vertex) { + if g.adjacency == nil { // initialize on first use + g.adjacency = make(map[Vertex]map[Vertex]*Edge) + } for _, v := range xv { if _, exists := g.adjacency[v]; !exists { g.adjacency[v] = make(map[Vertex]*Edge) diff --git a/pgraph/pgraph_test.go b/pgraph/pgraph_test.go index 86723f2254..4ba81897f6 100644 --- a/pgraph/pgraph_test.go +++ b/pgraph/pgraph_test.go @@ -40,7 +40,7 @@ func NV(s string) Vertex { func TestPgraphT1(t *testing.T) { - G, _ := NewGraph("g1") + G := &Graph{} if i := G.NumVertices(); i != 0 { t.Errorf("should have 0 vertices instead of: %d", i) @@ -66,7 +66,7 @@ func TestPgraphT1(t *testing.T) { func TestPgraphT2(t *testing.T) { - G, _ := NewGraph("g2") + G := &Graph{Name: "g2"} v1 := NV("v1") v2 := NV("v2") v3 := NV("v3") @@ -623,3 +623,21 @@ func TestPgraphT11(t *testing.T) { t.Errorf("reverse of vertex slice failed (6..1)") } } + +func TestPgraphCopy1(t *testing.T) { + g1 := &Graph{} + g2 := g1.Copy() // check this doesn't panic + if !reflect.DeepEqual(g1.String(), g2.String()) { + t.Errorf("graph copy failed") + } +} + +func TestPgraphDelete1(t *testing.T) { + G := &Graph{} + v1 := NV("v1") + G.DeleteVertex(v1) // check this doesn't panic + + if i := G.NumVertices(); i != 0 { + t.Errorf("should have 0 vertices instead of: %d", i) + } +} diff --git a/resources/actions.go b/resources/actions.go index d0155fdeed..2a58685e19 100644 --- a/resources/actions.go +++ b/resources/actions.go @@ -35,17 +35,27 @@ import ( "golang.org/x/time/rate" ) +// SentinelErr is a sentinal as an error type that wraps an arbitrary error. +type SentinelErr struct { + err error +} + +// Error is the required method to fulfill the error type. +func (obj *SentinelErr) Error() string { + return obj.err.Error() +} + // OKTimestamp returns true if this element can run right now? -func OKTimestamp(g *pgraph.Graph, v pgraph.Vertex) bool { +func (obj *BaseRes) OKTimestamp() bool { // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphVertices(v) { + for _, n := range obj.Graph.IncomingGraphVertices(obj.Vertex) { // if the vertex has a greater timestamp than any pre-req (n) // then we can't run right now... // if they're equal (eg: on init of 0) then we also can't run // b/c we should let our pre-req's go first... - x, y := VtoR(v).Timestamp(), VtoR(n).Timestamp() - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: OKTimestamp: (%v) >= %s(%v): !%v", VtoR(v).String(), x, VtoR(n).String(), y, x >= y) + x, y := obj.Timestamp(), VtoR(n).Timestamp() + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: OKTimestamp: (%v) >= %s(%v): !%v", obj, x, n, y, x >= y) } if x >= y { return false @@ -55,36 +65,35 @@ func OKTimestamp(g *pgraph.Graph, v pgraph.Vertex) bool { } // Poke tells nodes after me in the dependency graph that they need to refresh. -func Poke(g *pgraph.Graph, v pgraph.Vertex) error { - +func (obj *BaseRes) Poke() error { // if we're pausing (or exiting) then we should suspend poke's so that // the graph doesn't go on running forever until it's completely done! // this is an optional feature which we can do by default on user exit - if b, ok := g.Value("fastpause"); ok && util.Bool(b) { + if b, ok := obj.Graph.Value("fastpause"); ok && util.Bool(b) { return nil // TODO: should this be an error instead? } var wg sync.WaitGroup // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphVertices(v) { + for _, n := range obj.Graph.OutgoingGraphVertices(obj.Vertex) { // we can skip this poke if resource hasn't done work yet... it // needs to be poked if already running, or not running though! // TODO: does this need an || activity flag? if VtoR(n).GetState() != ResStateProcess { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: Poke: %s", VtoR(v).String(), VtoR(n).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: Poke: %s", obj, n) } wg.Add(1) go func(nn pgraph.Vertex) error { defer wg.Done() - //edge := g.adjacency[v][nn] // lookup + //edge := obj.Graph.adjacency[v][nn] // lookup //notify := edge.Notify && edge.Refresh() return VtoR(nn).SendEvent(event.EventPoke, nil) }(n) } else { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: Poke: %s: Skipped!", VtoR(v).String(), VtoR(n).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: Poke: %s: Skipped!", obj, n) } } } @@ -94,11 +103,11 @@ func Poke(g *pgraph.Graph, v pgraph.Vertex) error { } // BackPoke pokes the pre-requisites that are stale and need to run before I can run. -func BackPoke(g *pgraph.Graph, v pgraph.Vertex) { +func (obj *BaseRes) BackPoke() { var wg sync.WaitGroup // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphVertices(v) { - x, y, s := VtoR(v).Timestamp(), VtoR(n).Timestamp(), VtoR(n).GetState() + for _, n := range obj.Graph.IncomingGraphVertices(obj.Vertex) { + x, y, s := obj.Timestamp(), VtoR(n).Timestamp(), VtoR(n).GetState() // If the parent timestamp needs poking AND it's not running // Process, then poke it. If the parent is in ResStateProcess it // means that an event is pending, so we'll be expecting a poke @@ -106,8 +115,8 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) { // TODO: implement a stateLT (less than) to tell if something // happens earlier in the state cycle and that doesn't wrap nil if x >= y && (s != ResStateProcess && s != ResStateCheckApply) { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: BackPoke: %s", VtoR(v).String(), VtoR(n).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: BackPoke: %s", obj, n) } wg.Add(1) go func(nn pgraph.Vertex) error { @@ -116,8 +125,8 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) { }(n) } else { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: BackPoke: %s: Skipped!", VtoR(v).String(), VtoR(n).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: BackPoke: %s: Skipped!", obj, n) } } } @@ -127,9 +136,9 @@ func BackPoke(g *pgraph.Graph, v pgraph.Vertex) { // RefreshPending determines if any previous nodes have a refresh pending here. // If this is true, it means I am expected to apply a refresh when I next run. -func RefreshPending(g *pgraph.Graph, v pgraph.Vertex) bool { +func (obj *BaseRes) RefreshPending() bool { var refresh bool - for _, edge := range g.IncomingGraphEdges(v) { + for _, edge := range obj.Graph.IncomingGraphEdges(obj.Vertex) { // if we asked for a notify *and* if one is pending! if edge.Notify && edge.Refresh() { refresh = true @@ -140,8 +149,8 @@ func RefreshPending(g *pgraph.Graph, v pgraph.Vertex) bool { } // SetUpstreamRefresh sets the refresh value to any upstream vertices. -func SetUpstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) { - for _, edge := range g.IncomingGraphEdges(v) { +func (obj *BaseRes) SetUpstreamRefresh(b bool) { + for _, edge := range obj.Graph.IncomingGraphEdges(obj.Vertex) { if edge.Notify { edge.SetRefresh(b) } @@ -149,8 +158,8 @@ func SetUpstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) { } // SetDownstreamRefresh sets the refresh value to any downstream vertices. -func SetDownstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) { - for _, edge := range g.OutgoingGraphEdges(v) { +func (obj *BaseRes) SetDownstreamRefresh(b bool) { + for _, edge := range obj.Graph.OutgoingGraphEdges(obj.Vertex) { // if we asked for a notify *and* if one is pending! if edge.Notify { edge.SetRefresh(b) @@ -159,10 +168,9 @@ func SetDownstreamRefresh(g *pgraph.Graph, v pgraph.Vertex, b bool) { } // Process is the primary function to execute for a particular vertex in the graph. -func Process(g *pgraph.Graph, v pgraph.Vertex) error { - obj := VtoR(v) - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s[%s]: Process()", obj.GetKind(), obj.GetName()) +func (obj *BaseRes) Process() error { + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { // XXX + log.Printf("%s: Process()", obj) } // FIXME: should these SetState methods be here or after the sema code? defer obj.SetState(ResStateNil) // reset state when finished @@ -171,13 +179,13 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! - if !OKTimestamp(g, v) { - go BackPoke(g, v) + if !obj.OKTimestamp() { + go obj.BackPoke() return nil } // timestamp must be okay... - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s[%s]: OKTimestamp(%v)", obj.GetKind(), obj.GetName(), VtoR(v).Timestamp()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: OKTimestamp(%v)", obj, obj.Timestamp()) } // semaphores! @@ -188,23 +196,23 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { // The exception is that semaphores with a zero count will always block! // TODO: Add a close mechanism to close/unblock zero count semaphores... semas := obj.Meta().Sema - if b, ok := g.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { - log.Printf("%s[%s]: Sema: P(%s)", obj.GetKind(), obj.GetName(), strings.Join(semas, ", ")) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { + log.Printf("%s: Sema: P(%s)", obj, strings.Join(semas, ", ")) } - if err := SemaLock(g, semas); err != nil { // lock + if err := SemaLock(obj.Graph, semas); err != nil { // lock // NOTE: in practice, this might not ever be truly necessary... return fmt.Errorf("shutdown of semaphores") } - defer SemaUnlock(g, semas) // unlock - if b, ok := g.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { - defer log.Printf("%s[%s]: Sema: V(%s)", obj.GetKind(), obj.GetName(), strings.Join(semas, ", ")) + defer SemaUnlock(obj.Graph, semas) // unlock + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { + defer log.Printf("%s: Sema: V(%s)", obj, strings.Join(semas, ", ")) } var ok = true var applied = false // did we run an apply? // connect any senders to receivers and detect if values changed - if updated, err := obj.SendRecv(obj); err != nil { + if updated, err := obj.SendRecv(obj.Res); err != nil { return errwrap.Wrapf(err, "could not SendRecv in Process") } else if len(updated) > 0 { for _, changed := range updated { @@ -220,12 +228,12 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { var checkOK bool var err error - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s[%s]: CheckApply(%t)", obj.GetKind(), obj.GetName(), !noop) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: CheckApply(%t)", obj, !noop) } // lookup the refresh (notification) variable - refresh = RefreshPending(g, v) // do i need to perform a refresh? + refresh = obj.RefreshPending() // do i need to perform a refresh? obj.SetRefresh(refresh) // tell the resource // changes can occur after this... @@ -244,38 +252,37 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { // run the CheckApply! } else { // if this fails, don't UpdateTimestamp() - checkOK, err = obj.CheckApply(!noop) + checkOK, err = obj.Res.CheckApply(!noop) - if promErr := obj.Prometheus().UpdateCheckApplyTotal(obj.GetKind(), !noop, !checkOK, err != nil); promErr != nil { + if promErr := obj.Data().Prometheus.UpdateCheckApplyTotal(obj.GetKind(), !noop, !checkOK, err != nil); promErr != nil { // TODO: how to error correctly - log.Printf("%s: Prometheus.UpdateCheckApplyTotal() errored: %v", VtoR(v).String(), err) + log.Printf("%s: Prometheus.UpdateCheckApplyTotal() errored: %v", obj, err) } // TODO: Can the `Poll` converged timeout tracking be a // more general method for all converged timeouts? this // would simplify the resources by removing boilerplate - if VtoR(v).Meta().Poll > 0 { + if obj.Meta().Poll > 0 { if !checkOK { // something changed, restart timer - cuid, _, _ := VtoR(v).ConvergerUIDs() // get the converger uid used to report status - cuid.ResetTimer() // activity! - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s[%s]: Converger: ResetTimer", obj.GetKind(), obj.GetName()) + obj.cuid.ResetTimer() // activity! + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: Converger: ResetTimer", obj) } } } } if checkOK && err != nil { // should never return this way - log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.GetKind(), obj.GetName(), checkOK, err) + log.Fatalf("%s: CheckApply(): %t, %+v", obj, checkOK, err) } - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s[%s]: CheckApply(): %t, %v", obj.GetKind(), obj.GetName(), checkOK, err) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: CheckApply(): %t, %v", obj, checkOK, err) } // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK obj.StateOK(true) // reset if refresh { - SetUpstreamRefresh(g, v, false) // refresh happened, clear the request + obj.SetUpstreamRefresh(false) // refresh happened, clear the request obj.SetRefresh(false) } } @@ -301,14 +308,14 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { } if activity { // add refresh flag to downstream edges... - SetDownstreamRefresh(g, v, true) + obj.SetDownstreamRefresh(true) } // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! - VtoR(v).UpdateTimestamp() // this was touched... + obj.UpdateTimestamp() // this was touched... obj.SetState(ResStatePoking) // can't cancel parent poke - if err := Poke(g, v); err != nil { + if err := obj.Poke(); err != nil { return errwrap.Wrapf(err, "the Poke() failed") } } @@ -316,24 +323,11 @@ func Process(g *pgraph.Graph, v pgraph.Vertex) error { return errwrap.Wrapf(err, "could not Process() successfully") } -// SentinelErr is a sentinal as an error type that wraps an arbitrary error. -type SentinelErr struct { - err error -} - -// Error is the required method to fulfill the error type. -func (obj *SentinelErr) Error() string { - return obj.err.Error() -} - // innerWorker is the CheckApply runner that reads from processChan. -// TODO: would it be better if this was a method on BaseRes that took in *pgraph.Graph? -func innerWorker(g *pgraph.Graph, v pgraph.Vertex) { - obj := VtoR(v) +func (obj *BaseRes) innerWorker() { running := false done := make(chan struct{}) - playback := false // do we need to run another one? - _, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process) + playback := false // do we need to run another one? waiting := false var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration @@ -341,9 +335,9 @@ func innerWorker(g *pgraph.Graph, v pgraph.Vertex) { <-timer.C // unnecessary, shouldn't happen } - var delay = time.Duration(VtoR(v).Meta().Delay) * time.Millisecond - var retry = VtoR(v).Meta().Retry // number of tries left, -1 for infinite - var limiter = rate.NewLimiter(VtoR(v).Meta().Limit, VtoR(v).Meta().Burst) + var delay = time.Duration(obj.Meta().Delay) * time.Millisecond + var retry = obj.Meta().Retry // number of tries left, -1 for infinite + var limiter = rate.NewLimiter(obj.Meta().Limit, obj.Meta().Burst) limited := false wg := &sync.WaitGroup{} // wait for Process routine to exit @@ -351,49 +345,49 @@ func innerWorker(g *pgraph.Graph, v pgraph.Vertex) { Loop: for { select { - case ev, ok := <-obj.ProcessChan(): // must use like this + case ev, ok := <-obj.processChan: // must use like this if !ok { // processChan closed, let's exit break Loop // no event, so no ack! } - if VtoR(v).Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) + if obj.Meta().Poll == 0 { // skip for polling + obj.wcuid.SetConverged(false) } // if process started, but no action yet, skip! - if VtoR(v).GetState() == ResStateProcess { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: Skipped event!", VtoR(v).String()) + if obj.GetState() == ResStateProcess { + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: Skipped event!", obj) } ev.ACK() // ready for next message - VtoR(v).QuiesceGroup().Done() + obj.quiesceGroup.Done() continue } // if running, we skip running a new execution! // if waiting, we skip running a new execution! if running || waiting { - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: Playback added!", VtoR(v).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: Playback added!", obj) } playback = true ev.ACK() // ready for next message - VtoR(v).QuiesceGroup().Done() + obj.quiesceGroup.Done() continue } // catch invalid rates - if VtoR(v).Meta().Burst == 0 && !(VtoR(v).Meta().Limit == rate.Inf) { // blocked - e := fmt.Errorf("%s: Permanently limited (rate != Inf, burst: 0)", VtoR(v).String()) + if obj.Meta().Burst == 0 && !(obj.Meta().Limit == rate.Inf) { // blocked + e := fmt.Errorf("%s: Permanently limited (rate != Inf, burst: 0)", obj) ev.ACK() // ready for next message - VtoR(v).QuiesceGroup().Done() - VtoR(v).SendEvent(event.EventExit, &SentinelErr{e}) + obj.quiesceGroup.Done() + obj.SendEvent(event.EventExit, &SentinelErr{e}) continue } // rate limit // FIXME: consider skipping rate limit check if // the event is a poke instead of a watch event - if !limited && !(VtoR(v).Meta().Limit == rate.Inf) { // skip over the playback event... + if !limited && !(obj.Meta().Limit == rate.Inf) { // skip over the playback event... now := time.Now() r := limiter.ReserveN(now, 1) // one event // r.OK() seems to always be true here! @@ -401,12 +395,12 @@ Loop: if d > 0 { // delay limited = true playback = true - log.Printf("%s: Limited (rate: %v/sec, burst: %d, next: %v)", VtoR(v).String(), VtoR(v).Meta().Limit, VtoR(v).Meta().Burst, d) + log.Printf("%s: Limited (rate: %v/sec, burst: %d, next: %v)", obj, obj.Meta().Limit, obj.Meta().Burst, d) // start the timer... timer.Reset(d) waiting = true // waiting for retry timer ev.ACK() - VtoR(v).QuiesceGroup().Done() + obj.quiesceGroup.Done() continue } // otherwise, we run directly! } @@ -415,60 +409,60 @@ Loop: wg.Add(1) running = true go func(ev *event.Event) { - pcuid.SetConverged(false) // "block" Process + obj.pcuid.SetConverged(false) // "block" Process defer wg.Done() - if e := Process(g, v); e != nil { + if e := obj.Process(); e != nil { playback = true - log.Printf("%s: CheckApply errored: %v", VtoR(v).String(), e) + log.Printf("%s: CheckApply errored: %v", obj, e) if retry == 0 { - if err := obj.Prometheus().UpdateState(VtoR(v).String(), VtoR(v).GetKind(), prometheus.ResStateHardFail); err != nil { + if err := obj.Data().Prometheus.UpdateState(obj.String(), obj.GetKind(), prometheus.ResStateHardFail); err != nil { // TODO: how to error this? - log.Printf("%s: Prometheus.UpdateState() errored: %v", VtoR(v).String(), err) + log.Printf("%s: Prometheus.UpdateState() errored: %v", obj, err) } // wrap the error in the sentinel - VtoR(v).QuiesceGroup().Done() // before the Wait that happens in SendEvent! - VtoR(v).SendEvent(event.EventExit, &SentinelErr{e}) + obj.quiesceGroup.Done() // before the Wait that happens in SendEvent! + obj.SendEvent(event.EventExit, &SentinelErr{e}) return } if retry > 0 { // don't decrement the -1 retry-- } - if err := obj.Prometheus().UpdateState(VtoR(v).String(), VtoR(v).GetKind(), prometheus.ResStateSoftFail); err != nil { + if err := obj.Data().Prometheus.UpdateState(obj.String(), obj.GetKind(), prometheus.ResStateSoftFail); err != nil { // TODO: how to error this? - log.Printf("%s: Prometheus.UpdateState() errored: %v", VtoR(v).String(), err) + log.Printf("%s: Prometheus.UpdateState() errored: %v", obj, err) } - log.Printf("%s: CheckApply: Retrying after %.4f seconds (%d left)", VtoR(v).String(), delay.Seconds(), retry) + log.Printf("%s: CheckApply: Retrying after %.4f seconds (%d left)", obj, delay.Seconds(), retry) // start the timer... timer.Reset(delay) waiting = true // waiting for retry timer - // don't VtoR(v).QuiesceGroup().Done() b/c + // don't obj.quiesceGroup.Done() b/c // the timer is running and it can exit! return } - retry = VtoR(v).Meta().Retry // reset on success - close(done) // trigger + retry = obj.Meta().Retry // reset on success + close(done) // trigger }(ev) ev.ACK() // sync (now mostly useless) case <-timer.C: - if VtoR(v).Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) + if obj.Meta().Poll == 0 { // skip for polling + obj.wcuid.SetConverged(false) } waiting = false if !timer.Stop() { //<-timer.C // blocks, docs are wrong! } - log.Printf("%s: CheckApply delay expired!", VtoR(v).String()) + log.Printf("%s: CheckApply delay expired!", obj) close(done) // a CheckApply run (with possibly retry pause) finished case <-done: - if VtoR(v).Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) + if obj.Meta().Poll == 0 { // skip for polling + obj.wcuid.SetConverged(false) } - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: CheckApply finished!", VtoR(v).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { + log.Printf("%s: CheckApply finished!", obj) } done = make(chan struct{}) // reset // re-send this event, to trigger a CheckApply() @@ -478,21 +472,21 @@ Loop: // TODO: can this experience indefinite postponement ? // see: https://github.com/golang/go/issues/11506 // pause or exit is in process if not quiescing! - if !VtoR(v).IsQuiescing() { + if !obj.quiescing { playback = false - VtoR(v).QuiesceGroup().Add(1) // lock around it, b/c still running... + obj.quiesceGroup.Add(1) // lock around it, b/c still running... go func() { obj.Event() // replay a new event - VtoR(v).QuiesceGroup().Done() + obj.quiesceGroup.Done() }() } } running = false - pcuid.SetConverged(true) // "unblock" Process - VtoR(v).QuiesceGroup().Done() + obj.pcuid.SetConverged(true) // "unblock" Process + obj.quiesceGroup.Done() - case <-wcuid.ConvergedTimer(): - wcuid.SetConverged(true) // converged! + case <-obj.wcuid.ConvergedTimer(): + obj.wcuid.SetConverged(true) // converged! continue } } @@ -503,22 +497,21 @@ Loop: // Worker is the common run frontend of the vertex. It handles all of the retry // and retry delay common code, and ultimately returns the final status of this // vertex execution. -func Worker(g *pgraph.Graph, v pgraph.Vertex) error { +func (obj *BaseRes) Worker() error { // listen for chan events from Watch() and run // the Process() function when they're received // this avoids us having to pass the data into // the Watch() function about which graph it is // running on, which isolates things nicely... - obj := VtoR(v) - if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("%s: Worker: Running", VtoR(v).String()) - defer log.Printf("%s: Worker: Stopped", VtoR(v).String()) + if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) { // XXX + log.Printf("%s: Worker: Running", obj) + defer log.Printf("%s: Worker: Stopped", obj) } // run the init (should match 1-1 with Close function) - if err := obj.Init(); err != nil { + if err := obj.Res.Init(); err != nil { obj.ProcessExit() // always exit the worker function by finishing with Close() - if e := obj.Close(); e != nil { + if e := obj.Res.Close(); e != nil { err = multierr.Append(err, e) // list of errors } return errwrap.Wrapf(err, "could not Init() resource") @@ -528,16 +521,15 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { // timeout, we could inappropriately converge mid-apply! // avoid this by blocking convergence with a fake report // we also add a similar blocker around the worker loop! - _, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process) // XXX: put these in Init() ? - wcuid.SetConverged(true) // starts off false, and waits for loop timeout - pcuid.SetConverged(true) // starts off true, because it's not running... + // get extra cuids (worker, process) + obj.wcuid.SetConverged(true) // starts off false, and waits for loop timeout + obj.pcuid.SetConverged(true) // starts off true, because it's not running... - wg := obj.ProcessSync() - wg.Add(1) + obj.processSync.Add(1) go func() { - defer wg.Done() - innerWorker(g, v) + defer obj.processSync.Done() + obj.innerWorker() }() var err error // propagate the error up (this is a permanent BAD error!) @@ -547,7 +539,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { // NOTE: we're using the same retry and delay metaparams that CheckApply // uses. This is for practicality. We can separate them later if needed! var watchDelay time.Duration - var watchRetry = VtoR(v).Meta().Retry // number of tries left, -1 for infinite + var watchRetry = obj.Meta().Retry // number of tries left, -1 for infinite // watch blocks until it ends, & errors to retry for { // TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!) @@ -570,7 +562,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { if exit, send := obj.ReadEvent(event); exit != nil { obj.ProcessExit() err := *exit // exit err - if e := obj.Close(); err == nil { + if e := obj.Res.Close(); err == nil { err = e } else if e != nil { err = multierr.Append(err, e) // list of errors @@ -600,7 +592,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { } } timer.Stop() // it's nice to cleanup - log.Printf("%s: Watch delay expired!", VtoR(v).String()) + log.Printf("%s: Watch delay expired!", obj) // NOTE: we can avoid the send if running Watch guarantees // one CheckApply event on startup! //if pendingSendEvent { // TODO: should this become a list in the future? @@ -612,13 +604,12 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { // TODO: reset the watch retry count after some amount of success var e error - if VtoR(v).Meta().Poll > 0 { // poll instead of watching :( - cuid, _, _ := VtoR(v).ConvergerUIDs() // get the converger uid used to report status - cuid.StartTimer() - e = VtoR(v).Poll() - cuid.StopTimer() // clean up nicely + if obj.Meta().Poll > 0 { // poll instead of watching :( + obj.cuid.StartTimer() + e = obj.Poll() + obj.cuid.StopTimer() // clean up nicely } else { - e = VtoR(v).Watch() // run the watch normally + e = obj.Res.Watch() // run the watch normally } if e == nil { // exit signal err = nil // clean exit @@ -628,7 +619,7 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { err = sentinelErr.err break // sentinel means, perma-exit } - log.Printf("%s: Watch errored: %v", VtoR(v).String(), e) + log.Printf("%s: Watch errored: %v", obj, e) if watchRetry == 0 { err = fmt.Errorf("Permanent watch error: %v", e) break @@ -636,8 +627,8 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { if watchRetry > 0 { // don't decrement the -1 watchRetry-- } - watchDelay = time.Duration(VtoR(v).Meta().Delay) * time.Millisecond - log.Printf("%s: Watch: Retrying after %.4f seconds (%d left)", VtoR(v).String(), watchDelay.Seconds(), watchRetry) + watchDelay = time.Duration(obj.Meta().Delay) * time.Millisecond + log.Printf("%s: Watch: Retrying after %.4f seconds (%d left)", obj, watchDelay.Seconds(), watchRetry) // We need to trigger a CheckApply after Watch restarts, so that // we catch any lost events that happened while down. We do this // by getting the Watch resource to send one event once it's up! @@ -646,148 +637,10 @@ func Worker(g *pgraph.Graph, v pgraph.Vertex) error { obj.ProcessExit() // close resource and return possible errors if any - if e := obj.Close(); err == nil { + if e := obj.Res.Close(); err == nil { err = e } else if e != nil { err = multierr.Append(err, e) // list of errors } return err } - -// Start is a main kick to start the graph. It goes through in reverse topological -// sort order so that events can't hit un-started vertices. -func Start(g *pgraph.Graph, first bool) { // start or continue - log.Printf("State: %v -> %v", setState(g, graphStateStarting), getState(g)) - defer log.Printf("State: %v -> %v", setState(g, graphStateStarted), getState(g)) - t, _ := g.TopologicalSort() - indegree := g.InDegree() // compute all of the indegree's - reversed := pgraph.Reverse(t) - wg := &sync.WaitGroup{} - for _, v := range reversed { // run the Setup() for everyone first - // run these in parallel, as long as we wait before continuing - wg.Add(1) - go func(vv pgraph.Vertex) { - defer wg.Done() - if !VtoR(vv).IsWorking() { // if Worker() is not running... - VtoR(vv).Setup() // initialize some vars in the resource - } - }(v) - } - wg.Wait() - - // ptr b/c: Mutex/WaitGroup must not be copied after first use - gwg := WgFromGraph(g) - - // run through the topological reverse, and start or unpause each vertex - for _, v := range reversed { - // selective poke: here we reduce the number of initial pokes - // to the minimum required to activate every vertex in the - // graph, either by direct action, or by getting poked by a - // vertex that was previously activated. if we poke each vertex - // that has no incoming edges, then we can be sure to reach the - // whole graph. Please note: this may mask certain optimization - // failures, such as any poke limiting code in Poke() or - // BackPoke(). You might want to disable this selective start - // when experimenting with and testing those elements. - // if we are unpausing (since it's not the first run of this - // function) we need to poke to *unpause* every graph vertex, - // and not just selectively the subset with no indegree. - - // let the startup code know to poke or not - // this triggers a CheckApply AFTER Watch is Running() - // We *don't* need to also do this to new nodes or nodes that - // are about to get unpaused, because they'll get poked by one - // of the indegree == 0 vertices, and an important aspect of the - // Process() function is that even if the state is correct, it - // will pass through the Poke so that it flows through the DAG. - VtoR(v).Starter(indegree[v] == 0) - - var unpause = true - if !VtoR(v).IsWorking() { // if Worker() is not running... - unpause = false // doesn't need unpausing on first start - gwg.Add(1) - // must pass in value to avoid races... - // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ - go func(vv pgraph.Vertex) { - defer gwg.Done() - defer VtoR(vv).Reset() - // TODO: if a sufficient number of workers error, - // should something be done? Should these restart - // after perma-failure if we have a graph change? - log.Printf("%s: Started", VtoR(vv).String()) - if err := Worker(g, vv); err != nil { // contains the Watch and CheckApply loops - log.Printf("%s: Exited with failure: %v", VtoR(vv).String(), err) - return - } - log.Printf("%s: Exited", VtoR(vv).String()) - }(v) - } - - select { - case <-VtoR(v).Started(): // block until started - case <-VtoR(v).Stopped(): // we failed on init - // if the resource Init() fails, we don't hang! - } - - if unpause { // unpause (if needed) - VtoR(v).SendEvent(event.EventStart, nil) // sync! - } - } - // we wait for everyone to start before exiting! -} - -// Pause sends pause events to the graph in a topological sort order. If you set -// the fastPause argument to true, then it will ask future propagation waves to -// not run through the graph before exiting, and instead will exit much quicker. -func Pause(g *pgraph.Graph, fastPause bool) { - log.Printf("State: %v -> %v", setState(g, graphStatePausing), getState(g)) - defer log.Printf("State: %v -> %v", setState(g, graphStatePaused), getState(g)) - if fastPause { - g.SetValue("fastpause", true) // set flag - } - t, _ := g.TopologicalSort() - for _, v := range t { // squeeze out the events... - VtoR(v).SendEvent(event.EventPause, nil) // sync - } - g.SetValue("fastpause", false) // reset flag -} - -// Exit sends exit events to the graph in a topological sort order. -func Exit(g *pgraph.Graph) { - if g == nil { // empty graph that wasn't populated yet - return - } - - // FIXME: a second ^C could put this into fast pause, but do it for now! - Pause(g, true) // implement this with pause to avoid duplicating the code - - t, _ := g.TopologicalSort() - for _, v := range t { // squeeze out the events... - // turn off the taps... - // XXX: consider instead doing this by closing the Res.events channel instead? - // XXX: do this by sending an exit signal, and then returning - // when we hit the 'default' in the select statement! - // XXX: we can do this to quiesce, but it's not necessary now - - VtoR(v).SendEvent(event.EventExit, nil) - VtoR(v).WaitGroup().Wait() - } - gwg := WgFromGraph(g) - gwg.Wait() // for now, this doesn't need to be a separate Wait() method -} - -// WgFromGraph returns a pointer to the waitgroup stored with the graph, -// otherwise it panics. If one does not exist, it will create it. -func WgFromGraph(g *pgraph.Graph) *sync.WaitGroup { - x, exists := g.Value("waitgroup") - if !exists { - g.SetValue("waitgroup", &sync.WaitGroup{}) - x, _ = g.Value("waitgroup") - } - - wg, ok := x.(*sync.WaitGroup) - if !ok { - panic("not a *sync.WaitGroup") - } - return wg -} diff --git a/resources/augeas.go b/resources/augeas.go index 72b3a88b45..398e46a803 100644 --- a/resources/augeas.go +++ b/resources/augeas.go @@ -119,7 +119,7 @@ func (obj *AugeasRes) Watch() error { for { if obj.debug { - log.Printf("%s[%s]: Watching: %s", obj.GetKind(), obj.GetName(), obj.File) // attempting to watch... + log.Printf("%s: Watching: %s", obj, obj.File) // attempting to watch... } select { @@ -128,10 +128,10 @@ func (obj *AugeasRes) Watch() error { return nil } if err := event.Error; err != nil { - return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "Unknown %s watcher error", obj) } if obj.debug { // don't access event.Body if event.Error isn't nil - log.Printf("%s[%s]: Event(%s): %v", obj.GetKind(), obj.GetName(), event.Body.Name, event.Body.Op) + log.Printf("%s: Event(%s): %v", obj, event.Body.Name, event.Body.Op) } send = true obj.StateOK(false) // dirty @@ -178,7 +178,7 @@ func (obj *AugeasRes) checkApplySet(apply bool, ag *augeas.Augeas, set AugeasSet // CheckApply method for Augeas resource. func (obj *AugeasRes) CheckApply(apply bool) (bool, error) { - log.Printf("%s[%s]: CheckApply: %s", obj.GetKind(), obj.GetName(), obj.File) + log.Printf("%s: CheckApply: %s", obj, obj.File) // By default we do not set any option to augeas, we use the defaults. opts := augeas.None if obj.Lens != "" { @@ -226,7 +226,7 @@ func (obj *AugeasRes) CheckApply(apply bool) (bool, error) { return checkOK, nil } - log.Printf("%s[%s]: changes needed, saving", obj.GetKind(), obj.GetName()) + log.Printf("%s: changes needed, saving", obj) if err = ag.Save(); err != nil { return false, errwrap.Wrapf(err, "augeas: error while saving augeas values") } diff --git a/resources/autoedge.go b/resources/autoedge.go index 5102bac466..f68f90992d 100644 --- a/resources/autoedge.go +++ b/resources/autoedge.go @@ -50,7 +50,7 @@ func addEdgesByMatchingUIDS(g *pgraph.Graph, v pgraph.Vertex, uids []ResUID) []b continue } if b, ok := g.Value("debug"); ok && util.Bool(b) { - log.Printf("Compile: AutoEdge: Match: %s with UID: %s[%s]", VtoR(vv).String(), uid.GetKind(), uid.GetName()) + log.Printf("Compile: AutoEdge: Match: %s with UID: %s", VtoR(vv).String(), uid) } // we must match to an effective UID for the resource, // that is to say, the name value of a res is a helpful diff --git a/resources/autogroup.go b/resources/autogroup.go index 0790ae7120..9d4a43a2c4 100644 --- a/resources/autogroup.go +++ b/resources/autogroup.go @@ -27,7 +27,7 @@ import ( errwrap "github.com/pkg/errors" ) -// AutoGrouper is the required interface to implement for an autogroup algorithm +// AutoGrouper is the required interface to implement for an autogroup algorithm. type AutoGrouper interface { // listed in the order these are typically called in... name() string // friendly identifier @@ -39,7 +39,7 @@ type AutoGrouper interface { vertexTest(bool) (bool, error) // call until false } -// baseGrouper is the base type for implementing the AutoGrouper interface +// baseGrouper is the base type for implementing the AutoGrouper interface. type baseGrouper struct { graph *pgraph.Graph // store a pointer to the graph vertices []pgraph.Vertex // cached list of vertices @@ -48,7 +48,7 @@ type baseGrouper struct { done bool } -// name provides a friendly name for the logs to see +// name provides a friendly name for the logs to see. func (ag *baseGrouper) name() string { return "baseGrouper" } @@ -150,7 +150,7 @@ func (ag *baseGrouper) edgeMerge(e1, e2 *pgraph.Edge) *pgraph.Edge { } // vertexTest processes the results of the grouping for the algorithm to know -// return an error if something went horribly wrong, and bool false to stop +// return an error if something went horribly wrong, and bool false to stop. func (ag *baseGrouper) vertexTest(b bool) (bool, error) { // NOTE: this particular baseGrouper version doesn't track what happens // because since we iterate over every pair, we don't care which merge! @@ -160,6 +160,7 @@ func (ag *baseGrouper) vertexTest(b bool) (bool, error) { return true, nil } +// NonReachabilityGrouper is the most straight-forward algorithm for grouping. // TODO: this algorithm may not be correct in all cases. replace if needed! type NonReachabilityGrouper struct { baseGrouper // "inherit" what we want, and reimplement the rest @@ -169,7 +170,7 @@ func (ag *NonReachabilityGrouper) name() string { return "NonReachabilityGrouper" } -// this algorithm relies on the observation that if there's a path from a to b, +// This algorithm relies on the observation that if there's a path from a to b, // then they *can't* be merged (b/c of the existing dependency) so therefore we // merge anything that *doesn't* satisfy this condition or that of the reverse! func (ag *NonReachabilityGrouper) vertexNext() (v1, v2 pgraph.Vertex, err error) { @@ -293,7 +294,7 @@ func VertexMerge(g *pgraph.Graph, v1, v2 pgraph.Vertex, vertexMergeFn func(pgrap return nil // success } -// autoGroup is the mechanical auto group "runner" that runs the interface spec +// autoGroup is the mechanical auto group "runner" that runs the interface spec. func autoGroup(g *pgraph.Graph, ag AutoGrouper) chan string { strch := make(chan string) // output log messages here go func(strch chan string) { @@ -341,7 +342,7 @@ func autoGroup(g *pgraph.Graph, ag AutoGrouper) chan string { return strch } -// AutoGroup runs the auto grouping on the graph and prints out log messages +// AutoGroup runs the auto grouping on the graph and prints out log messages. func AutoGroup(g *pgraph.Graph, ag AutoGrouper) { // receive log messages from channel... // this allows test cases to avoid printing them when they're unwanted! diff --git a/resources/exec.go b/resources/exec.go index c1c8c60fc3..2af5876682 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -148,7 +148,7 @@ func (obj *ExecRes) Watch() error { select { case text := <-bufioch: // each time we get a line of output, we loop! - log.Printf("%s[%s]: Watch output: %s", obj.GetKind(), obj.GetName(), text) + log.Printf("%s: Watch output: %s", obj, text) if text != "" { send = true obj.StateOK(false) // something made state dirty @@ -220,7 +220,7 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { } // apply portion - log.Printf("%s[%s]: Apply", obj.GetKind(), obj.GetName()) + log.Printf("%s: Apply", obj) var cmdName string var cmdArgs []string if obj.Shell == "" { @@ -288,10 +288,10 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { // would be nice, but it would require terminal log output that doesn't // interleave all the parallel parts which would mix it all up... if s := out.String(); s == "" { - log.Printf("%s[%s]: Command output is empty!", obj.GetKind(), obj.GetName()) + log.Printf("%s: Command output is empty!", obj) } else { - log.Printf("%s[%s]: Command output is:", obj.GetKind(), obj.GetName()) + log.Printf("%s: Command output is:", obj) log.Printf(out.String()) } diff --git a/resources/file.go b/resources/file.go index fc91daed56..7a1785f3f0 100644 --- a/resources/file.go +++ b/resources/file.go @@ -199,7 +199,7 @@ func (obj *FileRes) Watch() error { for { if obj.debug { - log.Printf("%s[%s]: Watching: %s", obj.GetKind(), obj.GetName(), obj.path) // attempting to watch... + log.Printf("%s: Watching: %s", obj, obj.path) // attempting to watch... } select { @@ -208,10 +208,10 @@ func (obj *FileRes) Watch() error { return nil } if err := event.Error; err != nil { - return errwrap.Wrapf(err, "unknown %s[%s] watcher error", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "unknown %s watcher error", obj) } if obj.debug { // don't access event.Body if event.Error isn't nil - log.Printf("%s[%s]: Event(%s): %v", obj.GetKind(), obj.GetName(), event.Body.Name, event.Body.Op) + log.Printf("%s: Event(%s): %v", obj, event.Body.Name, event.Body.Op) } send = true obj.StateOK(false) // dirty @@ -636,7 +636,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string) (bool, error) { // contentCheckApply performs a CheckApply for the file existence and content. func (obj *FileRes) contentCheckApply(apply bool) (checkOK bool, _ error) { - log.Printf("%s[%s]: contentCheckApply(%t)", obj.GetKind(), obj.GetName(), apply) + log.Printf("%s: contentCheckApply(%t)", obj, apply) if obj.State == "absent" { if _, err := os.Stat(obj.path); os.IsNotExist(err) { @@ -698,7 +698,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (checkOK bool, _ error) { // chmodCheckApply performs a CheckApply for the file permissions. func (obj *FileRes) chmodCheckApply(apply bool) (checkOK bool, _ error) { - log.Printf("%s[%s]: chmodCheckApply(%t)", obj.GetKind(), obj.GetName(), apply) + log.Printf("%s: chmodCheckApply(%t)", obj, apply) if obj.State == "absent" { // File is absent @@ -744,7 +744,7 @@ func (obj *FileRes) chmodCheckApply(apply bool) (checkOK bool, _ error) { // chownCheckApply performs a CheckApply for the file ownership. func (obj *FileRes) chownCheckApply(apply bool) (checkOK bool, _ error) { var expectedUID, expectedGID int - log.Printf("%s[%s]: chownCheckApply(%t)", obj.GetKind(), obj.GetName(), apply) + log.Printf("%s: chownCheckApply(%t)", obj, apply) if obj.State == "absent" { // File is absent or no owner specified diff --git a/resources/kv.go b/resources/kv.go index ccd0cd6cfd..bcf9f9db2c 100644 --- a/resources/kv.go +++ b/resources/kv.go @@ -113,10 +113,10 @@ func (obj *KVRes) Watch() error { return nil } if err != nil { - return errwrap.Wrapf(err, "unknown %s[%s] watcher error", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "unknown %s watcher error", obj) } if obj.Data().Debug { - log.Printf("%s[%s]: Event!", obj.GetKind(), obj.GetName()) + log.Printf("%s: Event!", obj) } send = true obj.StateOK(false) // dirty @@ -177,7 +177,7 @@ func (obj *KVRes) lessThanCheck(value string) (checkOK bool, err error) { // CheckApply method for Password resource. Does nothing, returns happy! func (obj *KVRes) CheckApply(apply bool) (checkOK bool, err error) { - log.Printf("%s[%s]: CheckApply(%t)", obj.GetKind(), obj.GetName(), apply) + log.Printf("%s: CheckApply(%t)", obj, apply) if val, exists := obj.Recv["Value"]; exists && val.Changed { // if we received on Value, and it changed, wooo, nothing to do. diff --git a/resources/mgraph.go b/resources/mgraph.go new file mode 100644 index 0000000000..73c8827dfc --- /dev/null +++ b/resources/mgraph.go @@ -0,0 +1,210 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package resources // TODO: can this be a separate package or will it break the dag? + +import ( + "log" + "sync" + + "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/pgraph" +) + +//go:generate stringer -type=graphState -output=graphstate_stringer.go +type graphState uint + +const ( + graphStateNil graphState = iota + graphStateStarting + graphStateStarted + graphStatePausing + graphStatePaused +) + +// MGraph is a meta graph structure used to encapsulate a generic graph +// structure alongside some non-generic elements. +type MGraph struct { + //Graph *pgraph.Graph + *pgraph.Graph // wrap a graph, and use its methods directly + + Data *Data + Debug bool + + state graphState + // ptr b/c: Mutex/WaitGroup must not be copied after first use + mutex *sync.Mutex + wg *sync.WaitGroup +} + +// Init initializes the internal structures. +func (obj *MGraph) Init() { + obj.mutex = &sync.Mutex{} + obj.wg = &sync.WaitGroup{} +} + +// getState returns the state of the graph. This state is used for optimizing +// certain algorithms by knowing what part of processing the graph is currently +// undergoing. +func (obj *MGraph) getState() graphState { + //obj.mutex.Lock() + //defer obj.mutex.Unlock() + return obj.state +} + +// setState sets the graph state and returns the previous state. +func (obj *MGraph) setState(state graphState) graphState { + obj.mutex.Lock() + defer obj.mutex.Unlock() + prev := obj.getState() + obj.state = state + return prev +} + +// Update switches our graph structure to the new graph that we pass to it. This +// also updates any references to the old graph so that they're now correct. It +// also updates references to the Data structure that should be passed around. +func (obj *MGraph) Update(newGraph *pgraph.Graph) { + obj.Graph = newGraph.Copy() // store as new active graph + // update stored reference to graph and other values that need storing! + for _, v := range obj.Graph.Vertices() { + res := VtoR(v) // resource + *res.Data() = *obj.Data // push the data around + res.Update(obj.Graph) // update graph pointer + } +} + +// Start is a main kick to start the graph. It goes through in reverse +// topological sort order so that events can't hit un-started vertices. +func (obj *MGraph) Start(first bool) { // start or continue + log.Printf("State: %v -> %v", obj.setState(graphStateStarting), obj.getState()) + defer log.Printf("State: %v -> %v", obj.setState(graphStateStarted), obj.getState()) + t, _ := obj.Graph.TopologicalSort() + indegree := obj.Graph.InDegree() // compute all of the indegree's + reversed := pgraph.Reverse(t) + wg := &sync.WaitGroup{} + for _, v := range reversed { // run the Setup() for everyone first + // run these in parallel, as long as we wait before continuing + wg.Add(1) + go func(vertex pgraph.Vertex, res Res) { + defer wg.Done() + // TODO: can't we do this check outside of the goroutine? + if !*res.Working() { // if Worker() is not running... + // NOTE: vertex == res here, but pass in both in + // case we ever wrap the res in something before + // we store it as the vertex in the graph struct + res.Setup(obj.Graph, vertex, res) // initialize some vars in the resource + } + }(v, VtoR(v)) + } + wg.Wait() + + // run through the topological reverse, and start or unpause each vertex + for _, v := range reversed { + res := VtoR(v) + // selective poke: here we reduce the number of initial pokes + // to the minimum required to activate every vertex in the + // graph, either by direct action, or by getting poked by a + // vertex that was previously activated. if we poke each vertex + // that has no incoming edges, then we can be sure to reach the + // whole graph. Please note: this may mask certain optimization + // failures, such as any poke limiting code in Poke() or + // BackPoke(). You might want to disable this selective start + // when experimenting with and testing those elements. + // if we are unpausing (since it's not the first run of this + // function) we need to poke to *unpause* every graph vertex, + // and not just selectively the subset with no indegree. + + // let the startup code know to poke or not + // this triggers a CheckApply AFTER Watch is Running() + // We *don't* need to also do this to new nodes or nodes that + // are about to get unpaused, because they'll get poked by one + // of the indegree == 0 vertices, and an important aspect of the + // Process() function is that even if the state is correct, it + // will pass through the Poke so that it flows through the DAG. + res.Starter(indegree[v] == 0) + + var unpause = true + if !*res.Working() { // if Worker() is not running... + *res.Working() = true // set Worker() running flag + + unpause = false // doesn't need unpausing on first start + obj.wg.Add(1) + // must pass in value to avoid races... + // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ + go func(vv pgraph.Vertex) { + defer obj.wg.Done() + // unset Worker() running flag just before exit + defer func() { *VtoR(vv).Working() = false }() + defer VtoR(vv).Reset() + // TODO: if a sufficient number of workers error, + // should something be done? Should these restart + // after perma-failure if we have a graph change? + log.Printf("%s: Started", vv) + if err := VtoR(vv).Worker(); err != nil { // contains the Watch and CheckApply loops + log.Printf("%s: Exited with failure: %v", vv, err) + return + } + log.Printf("%s: Exited", vv) + }(v) + } + + select { + case <-res.Started(): // block until started + case <-res.Stopped(): // we failed on init + // if the resource Init() fails, we don't hang! + } + + if unpause { // unpause (if needed) + res.SendEvent(event.EventStart, nil) // sync! + } + } + // we wait for everyone to start before exiting! +} + +// Pause sends pause events to the graph in a topological sort order. If you set +// the fastPause argument to true, then it will ask future propagation waves to +// not run through the graph before exiting, and instead will exit much quicker. +func (obj *MGraph) Pause(fastPause bool) { + log.Printf("State: %v -> %v", obj.setState(graphStatePausing), obj.getState()) + defer log.Printf("State: %v -> %v", obj.setState(graphStatePaused), obj.getState()) + if fastPause { + obj.Graph.SetValue("fastpause", true) // set flag + } + t, _ := obj.Graph.TopologicalSort() + for _, v := range t { // squeeze out the events... + VtoR(v).SendEvent(event.EventPause, nil) // sync + } + obj.Graph.SetValue("fastpause", false) // reset flag +} + +// Exit sends exit events to the graph in a topological sort order. +func (obj *MGraph) Exit() { + if obj.Graph == nil { // empty graph that wasn't populated yet + return + } + + // FIXME: a second ^C could put this into fast pause, but do it for now! + obj.Pause(true) // implement this with pause to avoid duplicating the code + + t, _ := obj.Graph.TopologicalSort() + for _, v := range t { // squeeze out the events... + // turn off the taps... + VtoR(v).Exit() // sync + } + obj.wg.Wait() // for now, this doesn't need to be a separate Wait() method +} diff --git a/resources/msg.go b/resources/msg.go index 204eecc98d..32142fc745 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -168,7 +168,7 @@ func (obj *MsgRes) CheckApply(apply bool) (bool, error) { } if !obj.logStateOK { - log.Printf("%s[%s]: Body: %s", obj.GetKind(), obj.GetName(), obj.Body) + log.Printf("%s: Body: %s", obj, obj.Body) obj.logStateOK = true obj.updateStateOK() } diff --git a/resources/noop.go b/resources/noop.go index 0568cf3fc8..3e136e151a 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -83,7 +83,7 @@ func (obj *NoopRes) Watch() error { // CheckApply method for Noop resource. Does nothing, returns happy! func (obj *NoopRes) CheckApply(apply bool) (checkOK bool, err error) { if obj.Refresh() { - log.Printf("%s[%s]: Received a notification!", obj.GetKind(), obj.GetName()) + log.Printf("%s: Received a notification!", obj) } return true, nil // state is always okay } diff --git a/resources/nspawn.go b/resources/nspawn.go index ce77f1b2d3..3ef97eae2a 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -134,11 +134,11 @@ func (obj *NspawnRes) Watch() error { case event := <-buschan: // process org.freedesktop.machine1 events for this resource's name if event.Body[0] == obj.GetName() { - log.Printf("%s[%s]: Event received: %v", obj.GetKind(), obj.GetName(), event.Name) + log.Printf("%s: Event received: %v", obj, event.Name) if event.Name == machineNew { - log.Printf("%s[%s]: Machine started", obj.GetKind(), obj.GetName()) + log.Printf("%s: Machine started", obj) } else if event.Name == machineRemoved { - log.Printf("%s[%s]: Machine stopped", obj.GetKind(), obj.GetName()) + log.Printf("%s: Machine stopped", obj) } else { return fmt.Errorf("unknown event: %s", event.Name) } @@ -195,13 +195,13 @@ func (obj *NspawnRes) CheckApply(apply bool) (checkOK bool, err error) { } } if obj.debug { - log.Printf("%s[%s]: properties: %v", obj.GetKind(), obj.GetName(), properties) + log.Printf("%s: properties: %v", obj, properties) } // if the machine doesn't exist and is supposed to // be stopped or the state matches we're done if !exists && obj.State == stopped || properties["State"] == obj.State { if obj.debug { - log.Printf("%s[%s]: CheckApply() in valid state", obj.GetKind(), obj.GetName()) + log.Printf("%s: CheckApply() in valid state", obj) } return true, nil } @@ -212,12 +212,12 @@ func (obj *NspawnRes) CheckApply(apply bool) (checkOK bool, err error) { } if obj.debug { - log.Printf("%s[%s]: CheckApply() applying '%s' state", obj.GetKind(), obj.GetName(), obj.State) + log.Printf("%s: CheckApply() applying '%s' state", obj, obj.State) } if obj.State == running { // start the machine using svc resource - log.Printf("%s[%s]: Starting machine", obj.GetKind(), obj.GetName()) + log.Printf("%s: Starting machine", obj) // assume state had to be changed at this point, ignore checkOK if _, err := obj.svc.CheckApply(apply); err != nil { return false, errwrap.Wrapf(err, "nested svc failed") @@ -226,7 +226,7 @@ func (obj *NspawnRes) CheckApply(apply bool) (checkOK bool, err error) { if obj.State == stopped { // terminate the machine with // org.freedesktop.machine1.Manager.KillMachine - log.Printf("%s[%s]: Stopping machine", obj.GetKind(), obj.GetName()) + log.Printf("%s: Stopping machine", obj) if err := conn.TerminateMachine(obj.GetName()); err != nil { return false, errwrap.Wrapf(err, "failed to stop machine") } diff --git a/resources/password.go b/resources/password.go index 6d7c9e20b1..fc6ac7814d 100644 --- a/resources/password.go +++ b/resources/password.go @@ -188,7 +188,7 @@ func (obj *PasswordRes) Watch() error { return nil } if err := event.Error; err != nil { - return errwrap.Wrapf(err, "unknown %s[%s] watcher error", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "unknown %s watcher error", obj) } send = true obj.StateOK(false) // dirty @@ -229,7 +229,7 @@ func (obj *PasswordRes) CheckApply(apply bool) (checkOK bool, err error) { if !obj.CheckRecovery { return false, errwrap.Wrapf(err, "check failed") } - log.Printf("%s[%s]: Integrity check failed", obj.GetKind(), obj.GetName()) + log.Printf("%s: Integrity check failed", obj) generate = true // okay to build a new one write = true // make sure to write over the old one } @@ -263,7 +263,7 @@ func (obj *PasswordRes) CheckApply(apply bool) (checkOK bool, err error) { } // generate the actual password var err error - log.Printf("%s[%s]: Generating new password...", obj.GetKind(), obj.GetName()) + log.Printf("%s: Generating new password...", obj) if password, err = obj.generate(); err != nil { // generate one! return false, errwrap.Wrapf(err, "could not generate password") } @@ -280,7 +280,7 @@ func (obj *PasswordRes) CheckApply(apply bool) (checkOK bool, err error) { output = password } // write either an empty token, or the password - log.Printf("%s[%s]: Writing password token...", obj.GetKind(), obj.GetName()) + log.Printf("%s: Writing password token...", obj) if _, err := obj.write(output); err != nil { return false, errwrap.Wrapf(err, "can't write to file") } diff --git a/resources/pgraph.go b/resources/pgraph.go deleted file mode 100644 index ca18ef3e87..0000000000 --- a/resources/pgraph.go +++ /dev/null @@ -1,189 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2017+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package resources - -import ( - "fmt" - "sync" - - "github.com/purpleidea/mgmt/event" - "github.com/purpleidea/mgmt/pgraph" - "github.com/purpleidea/mgmt/util" - - errwrap "github.com/pkg/errors" -) - -//go:generate stringer -type=graphState -output=graphstate_stringer.go -type graphState uint - -const ( - graphStateNil graphState = iota - graphStateStarting - graphStateStarted - graphStatePausing - graphStatePaused -) - -// getState returns the state of the graph. This state is used for optimizing -// certain algorithms by knowing what part of processing the graph is currently -// undergoing. -func getState(g *pgraph.Graph) graphState { - //mutex := StateLockFromGraph(g) - //mutex.Lock() - //defer mutex.Unlock() - if u, ok := g.Value("state"); ok { - return graphState(util.Uint(u)) - } - return graphStateNil -} - -// setState sets the graph state and returns the previous state. -func setState(g *pgraph.Graph, state graphState) graphState { - mutex := StateLockFromGraph(g) - mutex.Lock() - defer mutex.Unlock() - prev := getState(g) - g.SetValue("state", uint(state)) - return prev -} - -// StateLockFromGraph returns a pointer to the state lock stored with the graph, -// otherwise it panics. If one does not exist, it will create it. -func StateLockFromGraph(g *pgraph.Graph) *sync.Mutex { - x, exists := g.Value("mutex") - if !exists { - g.SetValue("mutex", &sync.Mutex{}) - x, _ = g.Value("mutex") - } - - m, ok := x.(*sync.Mutex) - if !ok { - panic("not a *sync.Mutex") - } - return m -} - -// VtoR casts the Vertex into a Res for use. It panics if it can't convert. -func VtoR(v pgraph.Vertex) Res { - res, ok := v.(Res) - if !ok { - panic("not a Res") - } - return res -} - -// GraphSync updates the oldGraph so that it matches the newGraph receiver. It -// leaves identical elements alone so that they don't need to be refreshed. It -// tries to mutate existing elements into new ones, if they support this. -// FIXME: add test cases -func GraphSync(g *pgraph.Graph, oldGraph *pgraph.Graph) (*pgraph.Graph, error) { - - if oldGraph == nil { - var err error - oldGraph, err = pgraph.NewGraph(g.GetName()) // copy over the name - if err != nil { - return nil, errwrap.Wrapf(err, "could not run GraphSync() properly") - } - } - oldGraph.SetName(g.GetName()) // overwrite the name - - var lookup = make(map[pgraph.Vertex]pgraph.Vertex) - var vertexKeep []pgraph.Vertex // list of vertices which are the same in new graph - var edgeKeep []*pgraph.Edge // list of vertices which are the same in new graph - - for v := range g.Adjacency() { // loop through the vertices (resources) - res := VtoR(v) // resource - var vertex pgraph.Vertex - - // step one, direct compare with res.Compare - if vertex == nil { // redundant guard for consistency - fn := func(v pgraph.Vertex) (bool, error) { - return VtoR(v).Compare(res), nil - } - var err error - vertex, err = oldGraph.VertexMatchFn(fn) - if err != nil { - return nil, errwrap.Wrapf(err, "could not VertexMatchFn() resource") - } - } - - // TODO: consider adding a mutate API. - // step two, try and mutate with res.Mutate - //if vertex == nil { // not found yet... - // vertex = oldGraph.MutateMatch(res) - //} - - if vertex == nil { // no match found yet - if err := res.Validate(); err != nil { - return nil, errwrap.Wrapf(err, "could not Validate() resource") - } - vertex = v - oldGraph.AddVertex(vertex) // call standalone in case not part of an edge - } - lookup[v] = vertex // used for constructing edges - vertexKeep = append(vertexKeep, vertex) // append - } - - // get rid of any vertices we shouldn't keep (that aren't in new graph) - for v := range oldGraph.Adjacency() { - if !pgraph.VertexContains(v, vertexKeep) { - // wait for exit before starting new graph! - VtoR(v).SendEvent(event.EventExit, nil) // sync - VtoR(v).WaitGroup().Wait() - oldGraph.DeleteVertex(v) - } - } - - // compare edges - for v1 := range g.Adjacency() { // loop through the vertices (resources) - for v2, e := range g.Adjacency()[v1] { - // we have an edge! - - // lookup vertices (these should exist now) - //res1 := v1.Res // resource - //res2 := v2.Res - //vertex1 := oldGraph.CompareMatch(res1) // now: VertexMatchFn - //vertex2 := oldGraph.CompareMatch(res2) // now: VertexMatchFn - vertex1, exists1 := lookup[v1] - vertex2, exists2 := lookup[v2] - if !exists1 || !exists2 { // no match found, bug? - //if vertex1 == nil || vertex2 == nil { // no match found - return nil, fmt.Errorf("new vertices weren't found") // programming error - } - - edge, exists := oldGraph.Adjacency()[vertex1][vertex2] - if !exists || edge.Name != e.Name { // TODO: edgeCmp - edge = e // use or overwrite edge - } - oldGraph.Adjacency()[vertex1][vertex2] = edge // store it (AddEdge) - edgeKeep = append(edgeKeep, edge) // mark as saved - } - } - - // delete unused edges - for v1 := range oldGraph.Adjacency() { - for _, e := range oldGraph.Adjacency()[v1] { - // we have an edge! - if !pgraph.EdgeContains(e, edgeKeep) { - oldGraph.DeleteEdge(e) - } - } - } - - return oldGraph, nil -} diff --git a/resources/pkg.go b/resources/pkg.go index ca2d74abdb..3f9b403f1c 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -179,9 +179,9 @@ func (obj *PkgRes) getNames() []string { // pretty print for header values func (obj *PkgRes) fmtNames(names []string) string { if len(obj.GetGroup()) > 0 { // grouped elements - return fmt.Sprintf("%s[autogroup:(%v)]", obj.GetKind(), strings.Join(names, ",")) + return fmt.Sprintf("%s[autogroup:(%s)]", obj.GetKind(), strings.Join(names, ",")) } - return fmt.Sprintf("%s[%s]", obj.GetKind(), obj.GetName()) + return obj.String() } func (obj *PkgRes) groupMappingHelper() map[string]string { diff --git a/resources/resources.go b/resources/resources.go index 8f306d04fd..5fbe74c93a 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -31,6 +31,7 @@ import ( // TODO: should each resource be a sub-package? "github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/event" + "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/prometheus" "github.com/purpleidea/mgmt/util" @@ -108,6 +109,8 @@ type Data struct { type ResUID interface { GetName() string GetKind() string + fmt.Stringer // String() string + IFF(ResUID) bool IsReversed() bool // true means this resource happens before the generator @@ -181,14 +184,12 @@ type Base interface { Meta() *MetaParams Events() chan *event.Event Data() *Data - IsWorking() bool - IsQuiescing() bool - QuiesceGroup() *sync.WaitGroup - WaitGroup() *sync.WaitGroup - Setup() + Working() *bool + Setup(*pgraph.Graph, pgraph.Vertex, Res) + Update(*pgraph.Graph) Reset() + Exit() Converger() converger.Converger - ConvergerUIDs() (converger.UID, converger.UID, converger.UID) GetState() ResState SetState(ResState) Timestamp() int64 @@ -213,10 +214,7 @@ type Base interface { Stopped() <-chan struct{} // returns when the resource has stopped Starter(bool) Poll() error // poll alternative to watching :( - ProcessChan() chan *event.Event - ProcessSync() *sync.WaitGroup - ProcessExit() - Prometheus() *prometheus.Prometheus + Worker() error } // Res is the minimum interface you need to implement to define a new resource. @@ -237,11 +235,15 @@ type Res interface { // BaseRes is the base struct that gets used in every resource. type BaseRes struct { - Name string `yaml:"name"` - MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams + Res Res // pointer to full res + Graph *pgraph.Graph // pointer to graph I'm currently in + Vertex pgraph.Vertex // pointer to vertex I currently am + Recv map[string]*Send // mapping of key to receive on from value + Kind string + Name string `yaml:"name"` + MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams - Kind string data Data timestamp int64 // last updated timestamp state ResState @@ -253,8 +255,8 @@ type BaseRes struct { processLock *sync.Mutex processDone bool - processChan chan *event.Event - processSync *sync.WaitGroup + processChan chan *event.Event // chan that resources send events to + processSync *sync.WaitGroup // blocks until the innerWorker closes converger converger.Converger // converged tracking cuid converger.UID @@ -266,11 +268,10 @@ type BaseRes struct { isStarted bool // did the started chan already close? starter bool // does this have indegree == 0 ? XXX: usually? - quiescing bool // are we quiescing (pause or exit) + quiescing bool // are we quiescing (pause or exit), tell event replay quiesceGroup *sync.WaitGroup waitGroup *sync.WaitGroup working bool // is the Worker() loop running ? - debug bool isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? @@ -278,6 +279,8 @@ type BaseRes struct { refresh bool // does this resource have a refresh to run? //refreshState StatefulBool // TODO: future stateful bool + + debug bool } // UnmarshalYAML is the custom unmarshal handler for the BaseRes struct. It is @@ -303,16 +306,21 @@ type BaseRes struct { // return nil //} -// GetName returns the name of the resource. +// GetName returns the name of the resource UID. func (obj *BaseUID) GetName() string { return obj.Name } -// GetKind returns the kind of the resource. +// GetKind returns the kind of the resource UID. func (obj *BaseUID) GetKind() string { return obj.Kind } +// String returns the canonical string representation for a resource UID. +func (obj *BaseUID) String() string { + return fmt.Sprintf("%s[%s]", obj.GetKind(), obj.GetName()) +} + // IFF looks at two UID's and if and only if they are equivalent, returns true. // If they are not equivalent, it returns false. // Most resources will want to override this method, since it does the important @@ -346,7 +354,7 @@ func (obj *BaseRes) Validate() error { // Init initializes structures like channels if created without New constructor. func (obj *BaseRes) Init() error { if obj.debug { - log.Printf("%s[%s]: Init()", obj.GetKind(), obj.GetName()) + log.Printf("%s: Init()", obj) } if obj.Kind == "" { return fmt.Errorf("resource did not set kind") @@ -364,9 +372,11 @@ func (obj *BaseRes) Init() error { obj.quiescing = false // no quiesce operation is happening at the moment obj.quiesceGroup = &sync.WaitGroup{} + // more useful than a closed channel signal, since it can be re-used + // safely without having to recreate it and worry about stale handles obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! obj.waitGroup.Add(1) - obj.working = true // Worker method should now be running... + //obj.working = true // Worker method should now be running... // FIXME: force a sane default until UnmarshalYAML on *BaseRes works... if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked @@ -383,7 +393,7 @@ func (obj *BaseRes) Init() error { // TODO: this StatefulBool implementation could be eventually swappable //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} - if err := obj.Prometheus().AddManagedResource(fmt.Sprintf("%s[%s]", obj.GetKind(), obj.GetName()), obj.GetKind()); err != nil { + if err := obj.Data().Prometheus.AddManagedResource(obj.String(), obj.GetKind()); err != nil { return errwrap.Wrapf(err, "could not increase prometheus counter!") } @@ -393,18 +403,18 @@ func (obj *BaseRes) Init() error { // Close shuts down and performs any cleanup. func (obj *BaseRes) Close() error { if obj.debug { - log.Printf("%s[%s]: Close()", obj.GetKind(), obj.GetName()) + log.Printf("%s: Close()", obj) } obj.pcuid.Unregister() obj.wcuid.Unregister() obj.cuid.Unregister() - obj.working = false // Worker method should now be closing... + //obj.working = false // Worker method should now be closing... close(obj.stopped) obj.waitGroup.Done() - if err := obj.Prometheus().RemoveManagedResource(fmt.Sprintf("%s[%s]", obj.GetKind(), obj.GetName()), obj.GetKind()); err != nil { + if err := obj.Data().Prometheus.RemoveManagedResource(obj.String(), obj.GetKind()); err != nil { return errwrap.Wrapf(err, "could not decrease prometheus counter!") } @@ -451,54 +461,56 @@ func (obj *BaseRes) Data() *Data { return &obj.data } -// IsWorking tells us if the Worker() function is running. Not thread safe. -func (obj *BaseRes) IsWorking() bool { - return obj.working +// Working returns a pointer to the bool which should track Worker run state. +func (obj *BaseRes) Working() *bool { + return &obj.working } -// IsQuiescing returns if there is a quiesce operation in progress. Pause and -// exit both meet this criteria, and this tells some systems to wind down, such -// as the event replay mechanism. -func (obj *BaseRes) IsQuiescing() bool { return obj.quiescing } - -// QuiesceGroup returns the sync group associated with the quiesce operations. -func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup } - -// WaitGroup returns a sync.WaitGroup which is open when the resource is done. -// This is more useful than a closed channel signal, since it can be re-used -// safely without having to recreate it and worry about stale channel handles. -func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup } - // Setup does some work which must happen before the Worker starts. It happens // once per Worker startup. It can happen in parallel with other Setup calls, so // add locks around any operation that's not thread-safe. -func (obj *BaseRes) Setup() { +func (obj *BaseRes) Setup(graph *pgraph.Graph, vertex pgraph.Vertex, res Res) { obj.started = make(chan struct{}) // closes when started obj.stopped = make(chan struct{}) // closes when stopped obj.eventsLock = &sync.Mutex{} obj.eventsDone = false obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events + + obj.Res = res // store a pointer to the full object + obj.Vertex = vertex // store a pointer to the vertex i'm + obj.Graph = graph // store a pointer to the graph we're in +} + +// Update refreshes the internal graph pointer that we're primarily used in. +func (obj *BaseRes) Update(graph *pgraph.Graph) { + obj.Graph = graph // store a pointer to the graph i'm in } // Reset from Setup. These can get called for different vertices in parallel. func (obj *BaseRes) Reset() { + obj.Res = nil + obj.Vertex = nil + obj.Graph = nil return } +// Exit the resource. Wrapper function to keep the logic in one place for now. +func (obj *BaseRes) Exit() { + // XXX: consider instead doing this by closing the Res.events channel instead? + // XXX: do this by sending an exit signal, and then returning + // when we hit the 'default' in the select statement! + // XXX: we can do this to quiesce, but it's not necessary now + obj.SendEvent(event.EventExit, nil) // sync + obj.waitGroup.Wait() +} + // Converger returns the converger object used by the system. It can be used to // register new convergers if needed. func (obj *BaseRes) Converger() converger.Converger { return obj.data.Converger } -// ConvergerUIDs returns the ConvergerUIDs for the resource. This is called by -// the various methods that need one of these ConvergerUIDs. They are registered -// by the Init method and unregistered on the resource Close. -func (obj *BaseRes) ConvergerUIDs() (cuid, wcuid, pcuid converger.UID) { - return obj.cuid, obj.wcuid, obj.pcuid -} - // GetState returns the state of the resource. func (obj *BaseRes) GetState() ResState { return obj.state @@ -507,7 +519,7 @@ func (obj *BaseRes) GetState() ResState { // SetState sets the state of the resource. func (obj *BaseRes) SetState(state ResState) { if obj.debug { - log.Printf("%s[%s]: State: %v -> %v", obj.GetKind(), obj.GetName(), obj.GetState(), state) + log.Printf("%s: State: %v -> %v", obj, obj.GetState(), state) } obj.state = state } @@ -533,12 +545,6 @@ func (obj *BaseRes) StateOK(b bool) { obj.isStateOK = b } -// ProcessChan returns the chan that resources send events to. Internal API! -func (obj *BaseRes) ProcessChan() chan *event.Event { return obj.processChan } - -// ProcessSync returns the WaitGroup that blocks until the innerWorker closes. -func (obj *BaseRes) ProcessSync() *sync.WaitGroup { return obj.processSync } - // ProcessExit causes the innerWorker to close and waits until it does so. func (obj *BaseRes) ProcessExit() { obj.processLock.Lock() // lock to avoid a send when closed! @@ -671,7 +677,7 @@ func (obj *BaseRes) VarDir(extra string) (string, error) { uid := obj.GetName() p := fmt.Sprintf("%s/", path.Join(obj.prefix, obj.GetKind(), uid, extra)) if err := os.MkdirAll(p, 0770); err != nil { - return "", errwrap.Wrapf(err, "can't create prefix for %s[%s]", obj.GetKind(), obj.GetName()) + return "", errwrap.Wrapf(err, "can't create prefix for %s", obj) } return p, nil } @@ -688,8 +694,6 @@ func (obj *BaseRes) Starter(b bool) { obj.starter = b } // Poll is the watch replacement for when we want to poll, which outputs events. func (obj *BaseRes) Poll() error { - cuid, _, _ := obj.ConvergerUIDs() // get the converger uid used to report status - // create a time.Ticker for the given interval ticker := time.NewTicker(time.Duration(obj.Meta().Poll) * time.Second) defer ticker.Stop() @@ -698,19 +702,19 @@ func (obj *BaseRes) Poll() error { if err := obj.Running(); err != nil { return err // bubble up a NACK... } - cuid.SetConverged(false) // quickly stop any converge due to Running() + obj.cuid.SetConverged(false) // quickly stop any converge due to Running() var send = false var exit *error for { select { case <-ticker.C: // received the timer event - log.Printf("%s[%s]: polling...", obj.GetKind(), obj.GetName()) + log.Printf("%s: polling...", obj) send = true obj.StateOK(false) // dirty case event := <-obj.Events(): - cuid.ResetTimer() // important + obj.cuid.ResetTimer() // important if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } @@ -723,9 +727,13 @@ func (obj *BaseRes) Poll() error { } } -// Prometheus returns the prometheus instance. -func (obj *BaseRes) Prometheus() *prometheus.Prometheus { - return obj.Data().Prometheus +// VtoR casts the Vertex into a Res for use. It panics if it can't convert. +func VtoR(v pgraph.Vertex) Res { + res, ok := v.(Res) + if !ok { + panic("not a Res") + } + return res } // TODO: consider adding a mutate API. diff --git a/resources/sendrecv.go b/resources/sendrecv.go index cde89cecb1..2242181362 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -46,9 +46,9 @@ func (obj *BaseRes) Event() error { func (obj *BaseRes) SendEvent(ev event.Kind, err error) error { if obj.debug { if err == nil { - log.Printf("%s[%s]: SendEvent(%+v)", obj.GetKind(), obj.GetName(), ev) + log.Printf("%s: SendEvent(%+v)", obj, ev) } else { - log.Printf("%s[%s]: SendEvent(%+v): %v", obj.GetKind(), obj.GetName(), ev, err) + log.Printf("%s: SendEvent(%+v): %v", obj, ev, err) } } resp := event.NewResp() @@ -129,7 +129,7 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { continue // silently discard this event while paused } // if we get a poke event here, it's a bug! - err = fmt.Errorf("%s[%s]: unknown event: %v, while paused", obj.GetKind(), obj.GetName(), e) + err = fmt.Errorf("%s: unknown event: %v, while paused", obj, e) panic(err) // TODO: return a special sentinel instead? //return &err, false } @@ -149,8 +149,7 @@ func (obj *BaseRes) Running() error { // converge timeout is very short ( ~ 1s) and the Watch method doesn't // immediately SetConverged(false) to stop possible early termination. if obj.Meta().Poll == 0 { // if not polling, unblock this... - cuid, _, _ := obj.ConvergerUIDs() - cuid.SetConverged(true) // a reasonable initial assumption + obj.cuid.SetConverged(true) // a reasonable initial assumption } obj.StateOK(false) // assume we're initially dirty @@ -179,7 +178,7 @@ type Send struct { func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) { if obj.debug { // NOTE: this could expose private resource data like passwords - log.Printf("%s[%s]: SendRecv: %+v", obj.GetKind(), obj.GetName(), obj.Recv) + log.Printf("%s: SendRecv: %+v", obj, obj.Recv) } var updated = make(map[string]bool) // list of updated keys var err error @@ -205,7 +204,7 @@ func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) { // i think we probably want the same kind, at least for now... if kind1 != kind2 { - e := fmt.Errorf("kind mismatch between %s[%s]: %s and %s[%s]: %s", v.Res.GetKind(), v.Res.GetName(), kind1, obj.GetKind(), obj.GetName(), kind2) + e := fmt.Errorf("kind mismatch between %s: %s and %s: %s", v.Res, kind1, obj, kind2) err = multierr.Append(err, e) // list of errors continue } @@ -213,21 +212,21 @@ func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) { // if the types don't match, we can't use send->recv // TODO: do we want to relax this for string -> *string ? if e := TypeCmp(value1, value2); e != nil { - e := errwrap.Wrapf(e, "type mismatch between %s[%s] and %s[%s]", v.Res.GetKind(), v.Res.GetName(), obj.GetKind(), obj.GetName()) + e := errwrap.Wrapf(e, "type mismatch between %s and %s", v.Res, obj) err = multierr.Append(err, e) // list of errors continue } // if we can't set, then well this is pointless! if !value2.CanSet() { - e := fmt.Errorf("can't set %s[%s].%s", obj.GetKind(), obj.GetName(), k) + e := fmt.Errorf("can't set %s.%s", obj, k) err = multierr.Append(err, e) // list of errors continue } // if we can't interface, we can't compare... if !value1.CanInterface() || !value2.CanInterface() { - e := fmt.Errorf("can't interface %s[%s].%s", obj.GetKind(), obj.GetName(), k) + e := fmt.Errorf("can't interface %s.%s", obj, k) err = multierr.Append(err, e) // list of errors continue } @@ -238,7 +237,7 @@ func (obj *BaseRes) SendRecv(res Res) (map[string]bool, error) { value2.Set(value1) // do it for all types that match updated[k] = true // we updated this key! v.Changed = true // tag this key as updated! - log.Printf("SendRecv: %s[%s].%s -> %s[%s].%s", v.Res.GetKind(), v.Res.GetName(), v.Key, obj.GetKind(), obj.GetName(), k) + log.Printf("SendRecv: %s.%s -> %s.%s", v.Res, v.Key, obj, k) } } return updated, err diff --git a/resources/svc.go b/resources/svc.go index c9641f8c5a..b9e7162ae3 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -196,7 +196,7 @@ func (obj *SvcRes) Watch() error { obj.StateOK(false) // dirty case err := <-subErrors: - return errwrap.Wrapf(err, "unknown %s[%s] error", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "unknown %s error", obj) case event := <-obj.Events(): if exit, send = obj.ReadEvent(event); exit != nil { @@ -267,7 +267,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { } // apply portion - log.Printf("%s[%s]: Apply", obj.GetKind(), obj.GetName()) + log.Printf("%s: Apply", obj) var files = []string{svc} // the svc represented in a list if obj.Startup == "enabled" { _, _, err = conn.EnableUnitFiles(files, false, true) @@ -289,7 +289,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { return false, errwrap.Wrapf(err, "failed to start unit") } if refresh { - log.Printf("%s[%s]: Skipping reload, due to pending start", obj.GetKind(), obj.GetName()) + log.Printf("%s: Skipping reload, due to pending start", obj) } refresh = false // we did a start, so a reload is not needed } else if obj.State == "stopped" { @@ -298,7 +298,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { return false, errwrap.Wrapf(err, "failed to stop unit") } if refresh { - log.Printf("%s[%s]: Skipping reload, due to pending stop", obj.GetKind(), obj.GetName()) + log.Printf("%s: Skipping reload, due to pending stop", obj) } refresh = false // we did a stop, so a reload is not needed } @@ -313,7 +313,7 @@ func (obj *SvcRes) CheckApply(apply bool) (checkOK bool, err error) { if refresh { // we need to reload the service // XXX: run a svc reload here! - log.Printf("%s[%s]: Reloading...", obj.GetKind(), obj.GetName()) + log.Printf("%s: Reloading...", obj) } // XXX: also set enabled on boot diff --git a/resources/timer.go b/resources/timer.go index cc470fedc7..0c5597f134 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -85,7 +85,7 @@ func (obj *TimerRes) Watch() error { select { case <-obj.ticker.C: // received the timer event send = true - log.Printf("%s[%s]: received tick", obj.GetKind(), obj.GetName()) + log.Printf("%s: received tick", obj) case event := <-obj.Events(): if exit, _ := obj.ReadEvent(event); exit != nil { diff --git a/resources/virt.go b/resources/virt.go index 600b752f96..8291831681 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -137,7 +137,7 @@ func (obj *VirtRes) Init() error { var u *url.URL var err error if u, err = url.Parse(obj.URI); err != nil { - return errwrap.Wrapf(err, "%s[%s]: Parsing URI failed: %s", obj.GetKind(), obj.GetName(), obj.URI) + return errwrap.Wrapf(err, "%s: Parsing URI failed: %s", obj, obj.URI) } switch u.Scheme { case "lxc": @@ -148,7 +148,7 @@ func (obj *VirtRes) Init() error { obj.conn, err = obj.connect() // gets closed in Close method of Res API if err != nil { - return errwrap.Wrapf(err, "%s[%s]: Connection to libvirt failed in init", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "%s: Connection to libvirt failed in init", obj) } // check for hard to change properties @@ -156,14 +156,14 @@ func (obj *VirtRes) Init() error { if err == nil { defer dom.Free() } else if !isNotFound(err) { - return errwrap.Wrapf(err, "%s[%s]: Could not lookup on init", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "%s: Could not lookup on init", obj) } if err == nil { // maxCPUs, err := dom.GetMaxVcpus() i, err := dom.GetVcpusFlags(libvirt.DOMAIN_VCPU_MAXIMUM) if err != nil { - return errwrap.Wrapf(err, "%s[%s]: Could not lookup MaxCPUs on init", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "%s: Could not lookup MaxCPUs on init", obj) } maxCPUs := uint(i) if obj.MaxCPUs != maxCPUs { // max cpu slots is hard to change @@ -176,11 +176,11 @@ func (obj *VirtRes) Init() error { // event handlers so that we don't miss any events via race? xmlDesc, err := dom.GetXMLDesc(0) // 0 means no flags if err != nil { - return errwrap.Wrapf(err, "%s[%s]: Could not GetXMLDesc on init", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "%s: Could not GetXMLDesc on init", obj) } domXML := &libvirtxml.Domain{} if err := domXML.Unmarshal(xmlDesc); err != nil { - return errwrap.Wrapf(err, "%s[%s]: Could not unmarshal XML on init", obj.GetKind(), obj.GetName()) + return errwrap.Wrapf(err, "%s: Could not unmarshal XML on init", obj) } // guest agent: domain->devices->channel->target->state == connected? @@ -400,22 +400,22 @@ func (obj *VirtRes) Watch() error { obj.guestAgentConnected = true obj.StateOK(false) // dirty send = true - log.Printf("%s[%s]: Guest agent connected", obj.GetKind(), obj.GetName()) + log.Printf("%s: Guest agent connected", obj) } else if state == libvirt.CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_STATE_DISCONNECTED { obj.guestAgentConnected = false // ignore CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_REASON_DOMAIN_STARTED // events because they just tell you that guest agent channel was added if reason == libvirt.CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_REASON_CHANNEL { - log.Printf("%s[%s]: Guest agent disconnected", obj.GetKind(), obj.GetName()) + log.Printf("%s: Guest agent disconnected", obj) } } else { - return fmt.Errorf("unknown %s[%s] guest agent state: %v", obj.GetKind(), obj.GetName(), state) + return fmt.Errorf("unknown %s guest agent state: %v", obj, state) } case err := <-errorChan: - return fmt.Errorf("unknown %s[%s] libvirt error: %s", obj.GetKind(), obj.GetName(), err) + return fmt.Errorf("unknown %s libvirt error: %s", obj, err) case event := <-obj.Events(): if exit, send = obj.ReadEvent(event); exit != nil { @@ -453,7 +453,7 @@ func (obj *VirtRes) domainCreate() (*libvirt.Domain, bool, error) { if err != nil { return dom, false, err // returned dom is invalid } - log.Printf("%s[%s]: Domain transient %s", state, obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain transient %s", state, obj) return dom, false, nil } @@ -461,20 +461,20 @@ func (obj *VirtRes) domainCreate() (*libvirt.Domain, bool, error) { if err != nil { return dom, false, err // returned dom is invalid } - log.Printf("%s[%s]: Domain defined", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain defined", obj) if obj.State == "running" { if err := dom.Create(); err != nil { return dom, false, err } - log.Printf("%s[%s]: Domain started", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain started", obj) } if obj.State == "paused" { if err := dom.CreateWithFlags(libvirt.DOMAIN_START_PAUSED); err != nil { return dom, false, err } - log.Printf("%s[%s]: Domain created paused", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain created paused", obj) } return dom, false, nil @@ -512,14 +512,14 @@ func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, erro return false, errwrap.Wrapf(err, "domain.Resume failed") } checkOK = false - log.Printf("%s[%s]: Domain resumed", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain resumed", obj) break } if err := dom.Create(); err != nil { return false, errwrap.Wrapf(err, "domain.Create failed") } checkOK = false - log.Printf("%s[%s]: Domain created", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain created", obj) case "paused": if domInfo.State == libvirt.DOMAIN_PAUSED { @@ -533,14 +533,14 @@ func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, erro return false, errwrap.Wrapf(err, "domain.Suspend failed") } checkOK = false - log.Printf("%s[%s]: Domain paused", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain paused", obj) break } if err := dom.CreateWithFlags(libvirt.DOMAIN_START_PAUSED); err != nil { return false, errwrap.Wrapf(err, "domain.CreateWithFlags failed") } checkOK = false - log.Printf("%s[%s]: Domain created paused", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain created paused", obj) case "shutoff": if domInfo.State == libvirt.DOMAIN_SHUTOFF || domInfo.State == libvirt.DOMAIN_SHUTDOWN { @@ -554,7 +554,7 @@ func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, erro return false, errwrap.Wrapf(err, "domain.Destroy failed") } checkOK = false - log.Printf("%s[%s]: Domain destroyed", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain destroyed", obj) } return checkOK, nil @@ -580,7 +580,7 @@ func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error if err := dom.SetMemory(obj.Memory); err != nil { return false, errwrap.Wrapf(err, "domain.SetMemory failed") } - log.Printf("%s[%s]: Memory changed to %d", obj.GetKind(), obj.GetName(), obj.Memory) + log.Printf("%s: Memory changed to %d", obj, obj.Memory) } // check cpus @@ -619,7 +619,7 @@ func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error return false, errwrap.Wrapf(err, "domain.SetVcpus failed") } checkOK = false - log.Printf("%s[%s]: CPUs (hot) changed to %d", obj.GetKind(), obj.GetName(), obj.CPUs) + log.Printf("%s: CPUs (hot) changed to %d", obj, obj.CPUs) case libvirt.DOMAIN_SHUTOFF, libvirt.DOMAIN_SHUTDOWN: if !obj.Transient { @@ -631,7 +631,7 @@ func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error return false, errwrap.Wrapf(err, "domain.SetVcpus failed") } checkOK = false - log.Printf("%s[%s]: CPUs (cold) changed to %d", obj.GetKind(), obj.GetName(), obj.CPUs) + log.Printf("%s: CPUs (cold) changed to %d", obj, obj.CPUs) } default: @@ -662,7 +662,7 @@ func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error return false, errwrap.Wrapf(err, "domain.SetVcpus failed") } checkOK = false - log.Printf("%s[%s]: CPUs (guest) changed to %d", obj.GetKind(), obj.GetName(), obj.CPUs) + log.Printf("%s: CPUs (guest) changed to %d", obj, obj.CPUs) } } @@ -686,7 +686,7 @@ func (obj *VirtRes) domainShutdownSync(apply bool, dom *libvirt.Domain) (bool, e return false, errwrap.Wrapf(err, "domain.GetInfo failed") } if domInfo.State == libvirt.DOMAIN_SHUTOFF || domInfo.State == libvirt.DOMAIN_SHUTDOWN { - log.Printf("%s[%s]: Shutdown", obj.GetKind(), obj.GetName()) + log.Printf("%s: Shutdown", obj) break } @@ -698,7 +698,7 @@ func (obj *VirtRes) domainShutdownSync(apply bool, dom *libvirt.Domain) (bool, e obj.processExitChan = make(chan struct{}) // if machine shuts down before we call this, we error; // this isn't ideal, but it happened due to user error! - log.Printf("%s[%s]: Running shutdown", obj.GetKind(), obj.GetName()) + log.Printf("%s: Running shutdown", obj) if err := dom.Shutdown(); err != nil { // FIXME: if machine is already shutdown completely, return early return false, errwrap.Wrapf(err, "domain.Shutdown failed") @@ -719,7 +719,7 @@ func (obj *VirtRes) domainShutdownSync(apply bool, dom *libvirt.Domain) (bool, e // https://libvirt.org/formatdomain.html#elementsEvents continue case <-timeout: - return false, fmt.Errorf("%s[%s]: didn't shutdown after %d seconds", obj.GetKind(), obj.GetName(), MaxShutdownDelayTimeout) + return false, fmt.Errorf("%s: didn't shutdown after %d seconds", obj, MaxShutdownDelayTimeout) } } @@ -791,7 +791,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { if err := dom.Undefine(); err != nil { return false, errwrap.Wrapf(err, "domain.Undefine failed") } - log.Printf("%s[%s]: Domain undefined", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain undefined", obj) } else { domXML, err := dom.GetXMLDesc(libvirt.DOMAIN_XML_INACTIVE) if err != nil { @@ -800,7 +800,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { if _, err = obj.conn.DomainDefineXML(domXML); err != nil { return false, errwrap.Wrapf(err, "conn.DomainDefineXML failed") } - log.Printf("%s[%s]: Domain defined", obj.GetKind(), obj.GetName()) + log.Printf("%s: Domain defined", obj) } checkOK = false } @@ -848,7 +848,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { // we had to do a restart, we didn't, and we should error if it was needed if obj.restartScheduled && restart == true && obj.RestartOnDiverge == "error" { - return false, fmt.Errorf("%s[%s]: needed restart but didn't! (RestartOnDiverge: %v)", obj.GetKind(), obj.GetName(), obj.RestartOnDiverge) + return false, fmt.Errorf("%s: needed restart but didn't! (RestartOnDiverge: %v)", obj, obj.RestartOnDiverge) } return checkOK, nil // w00t diff --git a/yamlgraph/gconfig.go b/yamlgraph/gconfig.go index b5a23daacf..50c546ed74 100644 --- a/yamlgraph/gconfig.go +++ b/yamlgraph/gconfig.go @@ -194,7 +194,7 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world resources.World, if matched { // we've already matched this resource, should we match again? - log.Printf("Config: Warning: Matching %s[%s] again!", kind, res.GetName()) + log.Printf("Config: Warning: Matching %s again!", res) } matched = true @@ -207,7 +207,7 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world resources.World, res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern } - log.Printf("Collect: %s[%s]: collected!", kind, res.GetName()) + log.Printf("Collect: %s: collected!", res) // XXX: similar to other resource add code: if _, exists := lookup[kind]; !exists { diff --git a/yamlgraph2/gconfig.go b/yamlgraph2/gconfig.go index c12522c010..b4b2bc947e 100644 --- a/yamlgraph2/gconfig.go +++ b/yamlgraph2/gconfig.go @@ -240,7 +240,7 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world resources.World, if matched { // we've already matched this resource, should we match again? - log.Printf("Config: Warning: Matching %s[%s] again!", kind, res.GetName()) + log.Printf("Config: Warning: Matching %s again!", res) } matched = true @@ -253,7 +253,7 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world resources.World, res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern } - log.Printf("Collect: %s[%s]: collected!", kind, res.GetName()) + log.Printf("Collect: %s: collected!", res) // XXX: similar to other resource add code: if _, exists := lookup[kind]; !exists {