Skip to content

Commit

Permalink
add node workloads number indexer:
Browse files Browse the repository at this point in the history
- add new job to the indexer
- update the `/stats` endpoint to view numWorkloads on network
- update schema/crafter to generate dump data
- update stats tests
  • Loading branch information
Omarabdul3ziz committed May 19, 2024
1 parent c060542 commit fa46309
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 14 deletions.
37 changes: 25 additions & 12 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@ type flags struct {
mnemonics string
maxPoolOpenConnections int

noIndexer bool // true to stop the indexer, useful on running for testing
indexerUpserterBatchSize uint
gpuIndexerIntervalMins uint
gpuIndexerNumWorkers uint
healthIndexerNumWorkers uint
healthIndexerIntervalMins uint
dmiIndexerNumWorkers uint
dmiIndexerIntervalMins uint
speedIndexerNumWorkers uint
speedIndexerIntervalMins uint
ipv6IndexerNumWorkers uint
ipv6IndexerIntervalMins uint
noIndexer bool // true to stop the indexer, useful on running for testing
indexerUpserterBatchSize uint
gpuIndexerIntervalMins uint
gpuIndexerNumWorkers uint
healthIndexerNumWorkers uint
healthIndexerIntervalMins uint
dmiIndexerNumWorkers uint
dmiIndexerIntervalMins uint
speedIndexerNumWorkers uint
speedIndexerIntervalMins uint
ipv6IndexerNumWorkers uint
ipv6IndexerIntervalMins uint
workloadsIndexerNumWorkers uint
workloadsIndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -103,6 +105,8 @@ func main() {
flag.UintVar(&f.speedIndexerNumWorkers, "speed-indexer-workers", 100, "number of workers checking on node speed")
flag.UintVar(&f.ipv6IndexerIntervalMins, "ipv6-indexer-interval", 60*24, "node ipv6 check interval in min")
flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6")
flag.UintVar(&f.workloadsIndexerIntervalMins, "workloads-indexer-interval", 60, "node workloads check interval in min")
flag.UintVar(&f.workloadsIndexerNumWorkers, "workloads-indexer-workers", 10, "number of workers checking on node workloads number")
flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -204,6 +208,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.ipv6IndexerNumWorkers,
)
ipv6Idx.Start(ctx)

wlNumIdx := indexer.NewIndexer[types.NodesWorkloads](
indexer.NewWorkloadWork(f.ipv6IndexerIntervalMins),
"workloads",
db,
rpcRmbClient,
f.ipv6IndexerNumWorkers,
)
wlNumIdx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,11 @@ func (p *PostgresDatabase) UpsertNodeIpv6Report(ctx context.Context, ips []types
}
return p.gormDB.WithContext(ctx).Table("node_ipv6").Clauses(onConflictClause).Create(&ips).Error
}

func (p *PostgresDatabase) UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"workloads_number"}),
}
return p.gormDB.WithContext(ctx).Table("node_workloads").Clauses(conflictClause).Create(&workloads).Error
}
9 changes: 9 additions & 0 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.Dmi{},
&types.Speed{},
&types.HasIpv6{},
&types.NodesWorkloads{},
)
if err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
Expand Down Expand Up @@ -186,6 +187,14 @@ func (d *PostgresDatabase) GetStats(ctx context.Context, filter types.StatsFilte
return stats, errors.Wrap(res.Error, "couldn't get dedicated nodes count")
}

if err := d.gormDB.WithContext(ctx).Table("node").
Select("SUM(workloads_number) as workloads_number").
Where(condition).
Joins("LEFT JOIN node_workloads ON node.twin_id = node_workloads.node_twin_id").
Scan(&stats.WorkloadsNumber).Error; err != nil {
return stats, errors.Wrap(res.Error, "couldn't sum workloads number")
}

return stats, nil
}

Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Database interface {
UpsertNodeDmi(ctx context.Context, dmis []types.Dmi) error
UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error
}

type ContractBilling types.ContractBilling
Expand Down
11 changes: 10 additions & 1 deletion grid-proxy/internal/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,17 @@ Work a struct that implement the interface `Work` which have three methods:
- Default caller worker number: 1
- Dump table: `dmi`
4. Speed indexer:

- Function: get the network upload/download speed on the node tested against `iperf` server.
- Interval: `5 min`
- Default caller worker number: 100
- Dump table: `speed`
5. Ipv6 indexer:
- Function: decide if the node has ipv6 or not.
- Interval: `1 day`
- Default caller worker number: 10
- Dump table: `node_ipv6`
6. Workloads indexer:
- Function: get the number of workloads on each node.
- Interval: `1 hour`
- Default caller worker number: 10
- Dump table: `node_workloads`
53 changes: 53 additions & 0 deletions grid-proxy/internal/indexer/workload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package indexer

import (
"context"
"time"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
)

const (
statsCall = "zos.statistics.get"
)

type WorkloadWork struct {
findersInterval map[string]time.Duration
}

func NewWorkloadWork(interval uint) *WorkloadWork {
return &WorkloadWork{
findersInterval: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
},
}
}

func (w *WorkloadWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *WorkloadWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodesWorkloads, error) {
var response struct {
Users struct {
Workloads uint32 `json:"workloads"`
} `json:"users"`
}

if err := callNode(ctx, rmb, statsCall, nil, twinId, &response); err != nil {
return []types.NodesWorkloads{}, err
}

return []types.NodesWorkloads{
{
NodeTwinId: twinId,
WorkloadsNumber: response.Users.Workloads,
},
}, nil
}

func (w *WorkloadWork) Upsert(ctx context.Context, db db.Database, batch []types.NodesWorkloads) error {
return db.UpsertNodeWorkloads(ctx, batch)
}
10 changes: 10 additions & 0 deletions grid-proxy/pkg/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ func (Speed) TableName() string {
return "speed"
}

// NodesWorkloads holds the number of workloads on a node
type NodesWorkloads struct {
NodeTwinId uint32 `json:"node_twin_id,omitempty" gorm:"unique;not null"`
WorkloadsNumber uint32 `json:"workloads_number"`
}

func (NodesWorkloads) TableName() string {
return "node_workloads"
}

// Dmi holds hardware dmi info for a node
// used as both gorm model and server json response
type Dmi struct {
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/pkg/types/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Stats struct {
NodesDistribution map[string]int64 `json:"nodesDistribution" gorm:"-:all"`
GPUs int64 `json:"gpus"`
DedicatedNodes int64 `json:"dedicatedNodes"`
WorkloadsNumber uint32 `json:"workloads_number"`
}

// StatsFilter statistics filters
Expand Down
4 changes: 3 additions & 1 deletion grid-proxy/tests/queries/mock_client/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (g *GridProxyMockClient) Stats(ctx context.Context, filter types.StatsFilte
res.Contracts += int64(len(g.data.NameContracts))
distribution := map[string]int64{}
dedicatedNodesCount := int64(0)
workloadsNumber := uint32(0)
var gpus int64
for _, node := range g.data.Nodes {
nodePower := types.NodePower{
Expand All @@ -41,12 +42,13 @@ func (g *GridProxyMockClient) Stats(ctx context.Context, filter types.StatsFilte
if isDedicatedNode(g.data, node) {
dedicatedNodesCount++
}
workloadsNumber += g.data.WorkloadsNumbers[uint32(node.TwinID)]
}
}
res.Countries = int64(len(distribution))
res.NodesDistribution = distribution
res.GPUs = gpus
res.DedicatedNodes = dedicatedNodesCount

res.WorkloadsNumber = workloadsNumber
return
}
32 changes: 32 additions & 0 deletions grid-proxy/tests/queries/mock_client/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DBData struct {
DMIs map[uint32]types.Dmi
Speeds map[uint32]types.Speed
PricingPolicies map[uint]PricingPolicy
WorkloadsNumbers map[uint32]uint32

DB *sql.DB
}
Expand Down Expand Up @@ -633,6 +634,33 @@ func loadSpeeds(db *sql.DB, data *DBData) error {
return nil
}

func loadWorkloadsNumber(db *sql.DB, data *DBData) error {
rows, err := db.Query(`
SELECT
node_twin_id,
workloads_number
FROM
node_workloads;
`)

if err != nil {
return err
}

for rows.Next() {
var wl types.NodesWorkloads
if err := rows.Scan(
&wl.NodeTwinId,
&wl.WorkloadsNumber,
); err != nil {
return err
}
data.WorkloadsNumbers[wl.NodeTwinId] = wl.WorkloadsNumber
}

return nil
}

func loadPricingPolicies(db *sql.DB, data *DBData) error {
rows, err := db.Query(`
SELECT
Expand Down Expand Up @@ -719,6 +747,7 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
Speeds: make(map[uint32]types.Speed),
NodeIpv6: make(map[uint32]bool),
PricingPolicies: make(map[uint]PricingPolicy),
WorkloadsNumbers: make(map[uint32]uint32),
DB: db,
}
if err := loadNodes(db, gormDB, &data); err != nil {
Expand Down Expand Up @@ -778,6 +807,9 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
if err := loadPricingPolicies(db, &data); err != nil {
return data, err
}
if err := loadWorkloadsNumber(db, &data); err != nil {
return data, err
}
if err := calcNodesUsedResources(&data); err != nil {
return data, err
}
Expand Down
22 changes: 22 additions & 0 deletions grid-proxy/tools/db/crafter/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,3 +964,25 @@ func (c *Crafter) GenerateNodeIpv6() error {

return nil
}

func (c *Crafter) GenerateNodeWorkloads() error {
start := c.NodeStart
end := c.NodeStart + c.NodeCount
nodeTwinsStart := c.TwinStart + (c.FarmStart + c.FarmCount)

var reports []types.NodesWorkloads
for i := start; i < end; i++ {
report := types.NodesWorkloads{
NodeTwinId: uint32(nodeTwinsStart + i),
WorkloadsNumber: uint32(rand.Intn(120)),
}
reports = append(reports, report)
}

if err := c.gormDB.Create(reports).Error; err != nil {
return fmt.Errorf("failed to insert node has workloads reports: %w", err)
}
fmt.Println("node workloads number reports generated")

return nil
}
4 changes: 4 additions & 0 deletions grid-proxy/tools/db/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func generateData(db *sql.DB, gormDB *gorm.DB, seed int) error {
return fmt.Errorf("failed to generate node ipv6 reports: %w", err)
}

if err := generator.GenerateNodeWorkloads(); err != nil {
return fmt.Errorf("failed to generate node workloads reports: %w", err)
}

if err := generator.GeneratePricingPolicies(); err != nil {
return fmt.Errorf("failed to generate PricingPolicies: %w", err)
}
Expand Down
13 changes: 13 additions & 0 deletions grid-proxy/tools/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1093,3 +1093,16 @@ CREATE TABLE IF NOT EXISTS public.node_ipv6 (
ALTER TABLE public.node_ipv6
OWNER TO postgres;


--
-- Name: node_workloads; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE IF NOT EXISTS public.node_workloads (
node_twin_id bigint NOT NULL,
workloads_number numeric
);

ALTER TABLE public.node_workloads
OWNER TO postgres;

0 comments on commit fa46309

Please sign in to comment.