Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the agent-cache-refresher on the explorer backend #170

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ WORKDIR /src
COPY . .
RUN apk add --no-cache gcc musl-dev
RUN go build -o explorer-stats-api ./cmd/api/
RUN go build -o cache-agent-refresh ./cmd/cache-agent-refresh/

FROM alpine:3.17
COPY --from=build /src/explorer-stats-api /bin/
COPY --from=build /src/cache-agent-refresh /bin/
EXPOSE 5000
EXPOSE 5050
EXPOSE 5070
Expand Down
335 changes: 335 additions & 0 deletions cmd/cache-agent-refresh/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"

Check failure on line 6 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

import 'io/ioutil' is not allowed from list 'main': Use os instead (depguard)
"log"
"math"
"net/http"
"os"
"strconv"
"strings"
"sync"

"github.com/urfave/cli/v2"
)

/* This is a simple golang app which controls the cache from the API.
*/

var (
// Metadata

Check failure on line 22 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
version string
commit string
branch string

// All max_interval_ must be in minutes

Check failure on line 27 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
max_interval_epoch_current int = 30 // 30 minutes
max_interval_epoch_past int = 1440 // 1 day
max_interval_overview int = 60 // 1 hour
max_interval_smeshers_current int = 30 // 30 minutes
max_interval_smeshers_past int = 1440 // 1 day
max_interval_smeshers int = 60 // 1 hour
max_interval_circulation int = 30 // 30 minutes

// App settings
// metricsPortFlag string

Check failure on line 37 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
layersPerEpoch int
targetNodesFlag string // comma separated list of target nodes
targetNodesJsonPortFlag int
targetNodesRefreshPortFlag int
targetNodesRefreshMetricFlag string
)

const (
refresh_path_epoch string = "/refresh/epoch/:id"
refresh_path_epoch_decentral string = "/refresh/epoch/:id/decentral"
// refresh_path_account string = "/refresh/account/:address" // TODO: review with @kacpersaw, if we can remove this from metrics, that can blow the prometheus metrics

Check failure on line 48 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
kacpersaw marked this conversation as resolved.
Show resolved Hide resolved
refresh_path_smeshers_per_epoch string = "/refresh/smeshers/:epoch"
refresh_path_smeshers string = "/refresh/smeshers"
// refresh_path_smesher string = "/refresh/smesher/:smesherId" // TODO: review with @kacpersaw, if we can remove this from metrics, that can blow the prometheus metrics

Check failure on line 51 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
kacpersaw marked this conversation as resolved.
Show resolved Hide resolved
refresh_path_overview string = "/refresh/overview"
refresh_path_circulation string = "/refresh/circulation"
)

func is_sync(latest_layer int, processed_layer int, tolerance int) bool {

Check failure on line 56 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
// Check if the node is synced
return processed_layer >= latest_layer-tolerance
}

func get_current_epoch(leyers_per_epoch int, current_layer int) int {

Check failure on line 61 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
// Current_Layer / Layers_Per_Epoch, if the decimal part is greater than 0.5, we are in the next epoch
return int(math.Floor(float64(current_layer) / float64(leyers_per_epoch)))
}

func prometheus_metrics_parcer(prometheus_metric_scrape string, metric_name string, label_value string) float64 {

Check failure on line 66 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
// Get all the lines with the metric name
// the will get the first line with contains the label value
// will split the line by the space and get the last element

lines := make([]string, 10)
for _, line := range strings.Split(prometheus_metric_scrape, "\n") {
if strings.Contains(line, metric_name) && strings.Contains(line, label_value) {
lines = append(lines, line)
}
}

if len(lines) == 0 {
return 0
}

// Get the last element of the line
last_line := lines[len(lines)-1]
splited_line := strings.Split(last_line, " ")
value := splited_line[len(splited_line)-1]

result, err := strconv.ParseFloat(value, 64)
if err != nil {
return 0
}
return result

}

type NodeStatus struct {
ProcessedLayer int `json:"processed_layer"`
LatestLayer int `json:"latest_layer"`
}

func get_status(node string, port int) (status NodeStatus, err error) {
url := fmt.Sprintf("%s:%d/spacemesh.v2alpha1.NodeService/Status", node, port)

// HTTP POST request to check status
resp, err := http.Post(url, "application/json", nil)
if err != nil || resp.StatusCode != 200 {
fmt.Printf("Error checking node %s:%d, status code: %d\n", node, port, resp.StatusCode)
return
}
defer resp.Body.Close()

// Read and parse response JSON
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading response from node %s:%d\n", node, port)
return
}

// Parse the response into NodeStatus struct

err = json.Unmarshal(body, &status)
if err != nil {
fmt.Printf("Error parsing JSON response from node %s:%d\n", node, port)
return
}

return
}

// checkNodeStatus checks the status of a node and whether it's synced

Check failure on line 129 in cmd/cache-agent-refresh/main.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func checkNodeStatus(node string, port int) (bool, string) {

// Parse the response into NodeStatus struct
status, err := get_status(node, port)

if err != nil {
fmt.Printf("Error getting status from node %s:%d\n", node, port)
return false, ""
}

// Check if node is synced using the is_sync function with a tolerance of 2 layers
if is_sync(status.LatestLayer, status.ProcessedLayer, 2) {
fmt.Printf("Node %s:%d is online and synced\n", node, port)
return true, node
}
return false, ""
}

func refresh_cache(node string, port int, path string, interval int, prometheus string) error {

log.Printf("Refreshing cache for %s:%d%s\n", node, port, path)

last_refresh := prometheus_metrics_parcer(prometheus, "cache_agent_last_refresh", path)
if last_refresh < float64(interval*60) {
return nil
}

url := fmt.Sprintf("%s:%d%s", node, port, path)

// HTTP GET request to refresh cache
resp, err := http.Get(url)
if err != nil || resp.StatusCode != 200 {
fmt.Printf("Error refreshing cache for %s:%d%s, status code: %d\n", node, port, path, resp.StatusCode)
return err
}
defer resp.Body.Close()

log.Printf("Cache refreshed for %s:%d%s\n", node, port, path)
return nil
}

func epoch_replace(path string, epoch int) string {
// Replace the :id and :epoch with the current epoch
epoch_path := strings.ReplaceAll(path, ":id", strconv.Itoa(epoch))
epoch_path = strings.ReplaceAll(epoch_path, ":epoch", strconv.Itoa(epoch))
return epoch_path
}

var flags = []cli.Flag{
&cli.IntFlag{
Name: "layers-per-epoch",
Usage: "Number of layers per epoch",
Required: false,
Destination: &layersPerEpoch,
Value: 4032,
EnvVars: []string{"SPACEMESH_LAYERS_PER_EPOCH"},
},
&cli.StringFlag{
Name: "target-nodes",
Usage: "Comma separated list of target nodes",
Required: true,
Destination: &targetNodesFlag,
EnvVars: []string{"TARGET_NODES"},
},
&cli.IntFlag{
Name: "target-nodes-json-port",
Usage: "Port for the JSON API of the target nodes",
Required: true,
Destination: &targetNodesJsonPortFlag,
EnvVars: []string{"TARGET_NODES_JSON_PORT"},
},
&cli.IntFlag{
Name: "target-nodes-refresh-port",
Usage: "Port for the refresh API of the target nodes",
Required: true,
Destination: &targetNodesRefreshPortFlag,
EnvVars: []string{"TARGET_NODES_REFRESH_PORT"},
},
&cli.StringFlag{
Name: "target-nodes-refresh-metric",
Usage: "Port for the refresh API of the target nodes",
Required: true,
Destination: &targetNodesRefreshMetricFlag,
EnvVars: []string{"TARGET_NODES_REFRESH_METRIC"},
},
}

func main() {

app := cli.NewApp()
app.Name = "cache-agent-refresh"
app.Version = fmt.Sprintf("%s, commit '%s', branch '%s'", version, commit, branch)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: instead '%s' you should probably use %q.

app.Flags = flags
app.Writer = os.Stderr

app.Action = func(ctx *cli.Context) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would improve the readability if you would extract this whole block as a normal named function.


nodes := strings.Split(targetNodesFlag, ",")
var targetNode string
// Check if the nodes are synced
for _, node := range nodes {
fmt.Printf("Checking node %s\n", node)
synced, _ := checkNodeStatus(node, targetNodesJsonPortFlag)
if synced {
targetNode = node
break
}
}

if targetNode == "" {
fmt.Println("No synced nodes found")
return fmt.Errorf("No synced nodes found")
}

// Get the prometheus metrics
prometheus_url := fmt.Sprintf("http://%s:%d/metrics", targetNode, targetNodesRefreshMetricFlag)
resp, err := http.Get(prometheus_url)
if err != nil {
log.Fatalf("Error getting prometheus metrics: %v", err)
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Error reading prometheus metrics: %v", err)
}

// Get the current epoch
nodeStatus, err := get_status(targetNode, targetNodesJsonPortFlag)
if err != nil {
log.Fatalf("Error getting node status: %v", err)
}
current_epoch := get_current_epoch(layersPerEpoch, nodeStatus.LatestLayer)

// refresh the cache of smeshers, overview and circulation
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
refresh_cache(targetNode, targetNodesRefreshPortFlag, refresh_path_overview, max_interval_overview, string(body))
}()

go func() {
defer wg.Done()
refresh_cache(targetNode, targetNodesRefreshPortFlag, refresh_path_circulation, max_interval_circulation, string(body))
}()

go func() {
defer wg.Done()
refresh_cache(targetNode, targetNodesRefreshPortFlag, refresh_path_smeshers, max_interval_smeshers, string(body))
}()

wg.Wait()

wg.Add(3)

go func() {
defer wg.Done()
path := epoch_replace(refresh_path_epoch, current_epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_epoch_current, string(body))
}()

go func() {
defer wg.Done()
path := epoch_replace(refresh_path_epoch_decentral, current_epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_epoch_current, string(body))
}()

go func() {
defer wg.Done()
path := epoch_replace(refresh_path_smeshers_per_epoch, current_epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_smeshers_current, string(body))
}()

wg.Wait()

// refresh the cache of the past epoch
for epoch := current_epoch - 1; epoch >= 0; epoch-- {
wg.Add(3)
go func(epoch int) {
defer wg.Done()
path := epoch_replace(refresh_path_epoch, epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_epoch_past, string(body))
}(epoch)

go func(epoch int) {
defer wg.Done()
path := epoch_replace(refresh_path_epoch_decentral, epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_epoch_past, string(body))
}(epoch)

go func(epoch int) {
defer wg.Done()
path := epoch_replace(refresh_path_smeshers_per_epoch, epoch)
refresh_cache(targetNode, targetNodesRefreshPortFlag, path, max_interval_smeshers_past, string(body))
}(epoch)

wg.Wait()
}
return nil
}

log.Println("Cache refresh completed")
os.Exit(0)

}
Loading