Skip to content

Commit

Permalink
feat: add support for basic subsystem
Browse files Browse the repository at this point in the history
Problem: we will eventually want to have multiple subsystems
Solution: for the time being, create a cluster graph with
a named subsystem lookup. Each item in the lookup is a named
subsystem, with a dominant one defined on the initial cluster
graph creation (currently set to be "nodes"). We can then
register clusters to the dominant subsystem (and eventually
register or add metadata for other subsystems). For an
early prototype, each subsystem graph will have a common
root, off of which named clusters will be defined. Metrics
will be added for each graph under the subsystem for a quick
assessment if some total resource is sufficient to warrant
traversing the graph (not done yet)

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 29, 2024
1 parent ed6e25d commit e237413
Show file tree
Hide file tree
Showing 20 changed files with 384 additions and 248 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ For more information:

## TODO

- subsystems
- we will want a function to add a new subsystem, right now we have one dominant for nodes
- make also a function to delete subsystems
- we can have top level metrics for quick assessment if cluster is OK
- subsystems should allow for multiple (with keys) and references across to dominant subsystem
- is there a way to unify into one graph?
Expand Down
5 changes: 3 additions & 2 deletions api/v1/rainbow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ service RainbowScheduler {
message RegisterRequest {
string name = 1;
string secret = 2;

// JGF json of nodes (needs to be read in with jsongraph-go)
string nodes = 3;
google.protobuf.Timestamp sent = 4;
string subsystem = 4;
google.protobuf.Timestamp sent = 5;
}

// SubmitJobRequest takes a job name, cluster name
Expand Down
95 changes: 71 additions & 24 deletions backends/memory/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,43 @@ import (
// A ClusterGraph holds one or more subsystems
// TODO add support for >1 subsystem, start with dominant
type ClusterGraph struct {
subsystem Subsystem
subsystem map[string]*Subsystem
lock sync.RWMutex
metrics Metrics
backupFile string

// TODO: cluster level metrics?

// The dominant subsystem is a lookup in the subsystem map
// It defaults to nodes (node resources)
dominantSubsystem string
}

// Dominant subsystem gets the dominant subsystem
func (c *ClusterGraph) DominantSubsystem() *Subsystem {
return c.subsystem[c.dominantSubsystem]
}

// getSubsystem returns a named subsystem, or falls back to the default
func (c *ClusterGraph) getSubsystem(subsystem string) string {
if subsystem == "" {
subsystem = c.dominantSubsystem
}
return subsystem
}

// NewClusterGraph creates a new cluster graph with a dominant subsystem
// TODO we will want a function that can add a new subsystem
func NewClusterGraph() *ClusterGraph {

// For now, the dominant subsystem is hard coded to be nodes (resources)
dominant := "nodes"
subsystem := NewSubsystem()
subsystems := map[string]*Subsystem{dominant: subsystem}

// TODO options / algorithms can come from config
// TODO: we should allow multiple subsystems here (node resources are dominant)
g := &ClusterGraph{
subsystem: NewSubsystem(),
metrics: Metrics{},
subsystem: subsystems,
dominantSubsystem: dominant,
}
// Listen for syscalls to exit
g.awaitExit()
Expand Down Expand Up @@ -89,17 +112,22 @@ func (g *ClusterGraph) Close() error {
return fp.Close()
}

func (g *ClusterGraph) GetMetrics() Metrics {
m := g.metrics
m.Vertices = g.subsystem.CountVertices()
return m
// GetMetrics for a named subsystem, defaulting to dominant
func (g *ClusterGraph) GetMetrics(subsystem string) Metrics {
subsystem = g.getSubsystem(subsystem)
ss := g.subsystem[subsystem]
return ss.Metrics
}

// Register cluster should:
// 1. Load in json graph of nodes from string
// 2. Add nodes to the graph, also keep top level metrics?
// 3. Return corresponding response
func (g *ClusterGraph) RegisterCluster(name string, payload string) (*service.Response, error) {
func (g *ClusterGraph) RegisterCluster(
name string,
payload string,
subsystem string,
) (*service.Response, error) {

// Prepare a response
response := service.Response{}
Expand All @@ -110,14 +138,21 @@ func (g *ClusterGraph) RegisterCluster(name string, payload string) (*service.Re
return nil, errors.New("cluster nodes are invalid")
}

// Load jgf into graph!
err = g.LoadClusterNodes(name, &nodes)
// Load jgf into graph for that subsystem!
err = g.LoadClusterNodes(name, &nodes, subsystem)

// do something with g.subsystem
return &response, err
}

func (g *ClusterGraph) LoadClusterNodes(name string, nodes *jgf.JsonGraph) error {
func (g *ClusterGraph) LoadClusterNodes(
name string,
nodes *jgf.JsonGraph,
subsystem string,
) error {

// Fall back to dominant subsystem name
subsystem = g.getSubsystem(subsystem)

// Let's be pedantic - no clusters allowed without nodes or edges
nNodes := len(nodes.Graph.Nodes)
Expand All @@ -126,21 +161,32 @@ func (g *ClusterGraph) LoadClusterNodes(name string, nodes *jgf.JsonGraph) error
return fmt.Errorf("cluster must have at least one edge and node")
}

// Grab the current subsystem - it must exist
ss, ok := g.subsystem[subsystem]
if !ok {
return fmt.Errorf("subsystem %s does not exist. Ensure it is created first", subsystem)
}

g.lock.Lock()
defer g.lock.Unlock()
log.Printf("Preparing to load %d nodes and %d edges\n", nNodes, nEdges)

// Get the root vertex, every new cluster starts there!
root, exists := g.subsystem.GetNode("root")
// Get the root vertex, every new subsystem starts there!
root, exists := ss.GetNode("root")
if !exists {
return fmt.Errorf("root node does not exist, this should not happen")
return fmt.Errorf("root node does not exist for subsystem %s, this should not happen", subsystem)
}

// The cluster root can only exist as one, and needs to be deleted if it does.
_, ok = ss.lookup[name]
if ok {
log.Printf("cluster %s already exists, cleaning up\n", name)
delete(ss.lookup, name)
}

// Add a cluster root to it, and connect to the top root
// What can we do with a weight? We probably want metadata too.
// One thing at a time!
clusterRoot := g.subsystem.AddNode(name)
err := g.subsystem.AddEdge(root, clusterRoot, 0)
// Add a cluster root to it, and connect to the top root. We can add metadata/weight here too
clusterRoot := ss.AddNode(name)
err := ss.AddEdge(root, clusterRoot, 0)
if err != nil {
return err
}
Expand All @@ -151,7 +197,7 @@ func (g *ClusterGraph) LoadClusterNodes(name string, nodes *jgf.JsonGraph) error
// This is pretty dumb because we don't add metadata yet, oh well
// we will!
for nid, _ := range nodes.Graph.Nodes {
id := g.subsystem.AddNode("")
id := ss.AddNode("")
lookup[nid] = id
}

Expand All @@ -167,13 +213,14 @@ func (g *ClusterGraph) LoadClusterNodes(name string, nodes *jgf.JsonGraph) error
if !ok {
return fmt.Errorf("destination %s is defined as an edge, but missing as node in graph", edge.Label)
}
err := g.subsystem.AddEdge(src, dest, 0)
err := ss.AddEdge(src, dest, 0)
if err != nil {
return err
}
}

log.Printf("We have made an in memory graph (subsystem) with %d vertices!", g.subsystem.CountVertices())
log.Printf("We have made an in memory graph (subsystem %s) with %d vertices!", subsystem, ss.CountVertices())
g.subsystem[subsystem] = ss
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions backends/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func (m MemoryGraph) Description() string {
// Note that a client can interact with the database (in read only)
// but since this is directly in the rainbow cluster, we call
// the functions directly.
func (m MemoryGraph) AddCluster(name string, nodes *jgf.JsonGraph) error {
func (m MemoryGraph) AddCluster(
name string,
nodes *jgf.JsonGraph,
subsystem string,
) error {

// How this might look for an external client
/* var opts []grpc.DialOption
Expand All @@ -47,7 +51,7 @@ func (m MemoryGraph) AddCluster(name string, nodes *jgf.JsonGraph) error {
client := service.NewMemoryGraphClient(conn)
ctx := context.Background()
client.Register(...) */
return graphClient.LoadClusterNodes(name, nodes)
return graphClient.LoadClusterNodes(name, nodes, subsystem)
}

func (m MemoryGraph) RegisterService(s *grpc.Server) error {
Expand Down
37 changes: 37 additions & 0 deletions backends/memory/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,51 @@
package memory

import (
"fmt"
"runtime"
"sync/atomic"
)

// Metrics keeps track of counts of things
type Metrics struct {
// This is across all subsystems
Vertices int `json:"vertices"`
Writes int64 `json:"writes"`
Reads int64 `json:"reads"`

// Resource specific metrics
ResourceSummary map[string]Summary
}

// Debugging function to print stats
// example usage:
// var mem runtime.MemStats
// printMemoryStats(mem)
// runtime.GC()
//
//lint:ignore U1000 Ignore unused function temporarily for debugging
func printMemoryStats(mem runtime.MemStats) {
runtime.ReadMemStats(&mem)
fmt.Printf("mem.Alloc: %d\n", mem.Alloc)
fmt.Printf("mem.TotalAlloc (cumulative): %d\n", mem.TotalAlloc)
fmt.Printf("mem.HeapAlloc: %d\n", mem.HeapAlloc)
fmt.Printf("mem.NumGC: %d\n\n", mem.NumGC)
}

// Resource summary to hold counts for each type
// We assemble this as we create a new graph
type Summary struct {
Name string
Counts map[string]int64
}

// NewResource resets the resource counters for a cluster
func (m *Metrics) NewResource(cluster string) {
m.ResourceSummary[cluster] = Summary{Name: cluster}
}

func (m *Metrics) AddResource() {

}

func (m *Metrics) IncWriteCount() *Metrics {
Expand Down
2 changes: 1 addition & 1 deletion backends/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type MemoryServer struct {

// Register takes a cluster node payload and adds to the in memory graph
func (MemoryServer) Register(c context.Context, req *service.RegisterRequest) (*service.Response, error) {
response, err := graphClient.RegisterCluster(req.Name, req.Payload)
response, err := graphClient.RegisterCluster(req.Name, req.Payload, req.Subsystem)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e237413

Please sign in to comment.