diff --git a/.github/workflows/tfrobot_test.yml b/.github/workflows/tfrobot_test.yml index c3f28db03..8108139c0 100644 --- a/.github/workflows/tfrobot_test.yml +++ b/.github/workflows/tfrobot_test.yml @@ -33,8 +33,14 @@ jobs: env: MNEMONIC: ${{ secrets.MNEMONICS }} NETWORK: main - # run: make integration run: | go run main.go deploy -c ./example/test.yaml + + - name: Cleanup + if: always() + env: + MNEMONIC: ${{ secrets.MNEMONICS }} + NETWORK: main + run: | sleep 120 # sleep to make sure graphql is up to date go run main.go cancel -c ./example/test.yaml -d diff --git a/grid-cli/cmd/deploy_kubernetes.go b/grid-cli/cmd/deploy_kubernetes.go index 840e69e73..95796f540 100644 --- a/grid-cli/cmd/deploy_kubernetes.go +++ b/grid-cli/cmd/deploy_kubernetes.go @@ -200,14 +200,18 @@ var deployKubernetesCmd = &cobra.Command{ disks, nil, rootfss, + uint64(len(workers)), ) if err != nil { log.Fatal().Err(err).Send() } - workersNode = uint32(workersNodes[0].NodeID) - } - for i := 0; i < workerNumber; i++ { - workers[i].Node = workersNode + for i, node := range workersNodes { + workers[i].Node = uint32(node.NodeID) + } + } else { + for i := 0; i < workerNumber; i++ { + workers[i].Node = workersNode + } } cluster, err := command.DeployKubernetesCluster(cmd.Context(), t, master, workers, string(sshKey)) if err != nil { @@ -291,5 +295,4 @@ func init() { deployKubernetesCmd.Flags().Bool("ipv6", false, "assign public ipv6 for master") deployKubernetesCmd.Flags().Bool("ygg", true, "assign yggdrasil ip for master") deployKubernetesCmd.Flags().Bool("mycelium", true, "assign mycelium ip for master") - } diff --git a/grid-cli/internal/cmd/deploy.go b/grid-cli/internal/cmd/deploy.go index 69696e1aa..f9830696d 100644 --- a/grid-cli/internal/cmd/deploy.go +++ b/grid-cli/internal/cmd/deploy.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net" + "slices" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -57,9 +58,12 @@ func DeployKubernetesCluster(ctx context.Context, t deployer.TFPluginClient, mas networkName := fmt.Sprintf("%snetwork", master.Name) projectName := fmt.Sprintf("kubernetes/%s", master.Name) networkNodes := []uint32{master.Node} - if len(workers) > 0 && workers[0].Node != master.Node { - networkNodes = append(networkNodes, workers[0].Node) + for _, worker := range workers { + if !slices.Contains(networkNodes, worker.Node) { + networkNodes = append(networkNodes, worker.Node) + } } + network, err := buildNetwork(networkName, projectName, networkNodes, len(master.MyceliumIPSeed) != 0) if err != nil { return workloads.K8sCluster{}, err diff --git a/grid-client/deployer/tf_plugin_client.go b/grid-client/deployer/tf_plugin_client.go index 1a49e2fd9..a70ac2b93 100644 --- a/grid-client/deployer/tf_plugin_client.go +++ b/grid-client/deployer/tf_plugin_client.go @@ -141,7 +141,7 @@ func parsePluginOpts(opts ...PluginOpt) (pluginCfg, error) { } if cfg.network != DevNetwork && cfg.network != QaNetwork && cfg.network != TestNetwork && cfg.network != MainNetwork { - return cfg, errors.Errorf("network must be one of dev, qa, test, and main not %s", cfg.network) + return cfg, errors.Errorf("network must be one of %s, %s, %s, and %s not %s", DevNetwork, QaNetwork, TestNetwork, MainNetwork, cfg.network) } if len(cfg.proxyURLs) == 0 { @@ -204,12 +204,12 @@ func NewTFPluginClient( var identity substrate.Identity switch cfg.keyType { - case "ed25519": + case peer.KeyTypeEd25519: identity, err = substrate.NewIdentityFromEd25519Phrase(tfPluginClient.mnemonicOrSeed) - case "sr25519": + case peer.KeyTypeSr25519: identity, err = substrate.NewIdentityFromSr25519Phrase(tfPluginClient.mnemonicOrSeed) default: - err = errors.Errorf("key type must be one of ed25519 and sr25519 not %s", cfg.keyType) + err = errors.Errorf("key type must be one of %s and %s not %s", peer.KeyTypeEd25519, peer.KeyTypeSr25519, cfg.keyType) } if err != nil { diff --git a/grid-client/graphql/graphql.go b/grid-client/graphql/graphql.go index b86337ddc..db37aab7b 100644 --- a/grid-client/graphql/graphql.go +++ b/grid-client/graphql/graphql.go @@ -9,17 +9,24 @@ import ( "net/http" "time" + "github.com/cenkalti/backoff" "github.com/pkg/errors" + "github.com/rs/zerolog/log" ) // GraphQl for tf graphql type GraphQl struct { - url string + urls []string + activeStackIdx int } // NewGraphQl new tf graphql -func NewGraphQl(url string) (GraphQl, error) { - return GraphQl{url}, nil +func NewGraphQl(urls ...string) (GraphQl, error) { + if len(urls) == 0 { + return GraphQl{}, errors.New("graphql url is required") + } + + return GraphQl{urls: urls, activeStackIdx: 0}, nil } // GetItemTotalCount return count of items @@ -34,10 +41,7 @@ func (g *GraphQl) GetItemTotalCount(itemName string, options string) (float64, e bodyReader := bytes.NewReader(jsonBody) - cl := &http.Client{ - Timeout: 10 * time.Second, - } - countResponse, err := cl.Post(g.url, "application/json", bodyReader) + countResponse, err := g.httpPost(bodyReader) if err != nil { return 0, err } @@ -66,10 +70,7 @@ func (g *GraphQl) Query(body string, variables map[string]interface{}) (map[stri bodyReader := bytes.NewReader(jsonBody) - cl := &http.Client{ - Timeout: 10 * time.Second, - } - resp, err := cl.Post(g.url, "application/json", bodyReader) + resp, err := g.httpPost(bodyReader) if err != nil { return result, err } @@ -103,3 +104,44 @@ func parseHTTPResponse(resp *http.Response) (map[string]interface{}, error) { return data, nil } + +func (g *GraphQl) httpPost(body io.Reader) (*http.Response, error) { + cl := &http.Client{ + Timeout: 10 * time.Second, + } + + var ( + endpoint string + reqErr error + resp *http.Response + ) + + backoffCfg := backoff.WithMaxRetries( + backoff.NewConstantBackOff(1*time.Millisecond), + 2, + ) + + err := backoff.RetryNotify(func() error { + endpoint = g.urls[g.activeStackIdx] + log.Debug().Str("url", endpoint).Msg("checking") + + resp, reqErr = cl.Post(endpoint, "application/json", body) + if reqErr != nil && + (errors.Is(reqErr, http.ErrAbortHandler) || + errors.Is(reqErr, http.ErrHandlerTimeout) || + errors.Is(reqErr, http.ErrServerClosed)) { + g.activeStackIdx = (g.activeStackIdx + 1) % len(g.urls) + return reqErr + } + + return nil + }, backoffCfg, func(err error, _ time.Duration) { + log.Error().Err(err).Msg("failed to connect to endpoint, retrying") + }) + + if err != nil { + log.Error().Err(err).Msg("failed to connect to endpoint") + } + + return resp, reqErr +} diff --git a/grid-proxy/go.mod b/grid-proxy/go.mod index 6adf7b1c7..b5a8af3aa 100644 --- a/grid-proxy/go.mod +++ b/grid-proxy/go.mod @@ -3,6 +3,7 @@ module github.com/threefoldtech/tfgrid-sdk-go/grid-proxy go 1.21 require ( + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v3 v3.2.2 github.com/go-acme/lego/v4 v4.16.1 github.com/google/go-cmp v0.6.0 @@ -28,7 +29,6 @@ require ( github.com/ChainSafe/go-schnorrkel v1.1.0 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/blang/semver v3.5.1+incompatible // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect diff --git a/grid-proxy/pkg/client/grid_client.go b/grid-proxy/pkg/client/grid_client.go index b58ec7dc0..8dc2826fb 100644 --- a/grid-proxy/pkg/client/grid_client.go +++ b/grid-proxy/pkg/client/grid_client.go @@ -13,8 +13,10 @@ import ( "strings" "time" + "github.com/cenkalti/backoff" "github.com/gorilla/schema" "github.com/pkg/errors" + "github.com/rs/zerolog/log" "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types" ) @@ -52,15 +54,23 @@ type Client interface { // Clientimpl concrete implementation of the client to communicate with the grid proxy type Clientimpl struct { - endpoint string + endpoints []string + activeStackIdx int } // NewClient grid proxy client constructor -func NewClient(endpoint string) Client { - if endpoint[len(endpoint)-1] != '/' { - endpoint += "/" +func NewClient(endpoints ...string) Client { + for i, endpoint := range endpoints { + if endpoint[len(endpoint)-1] != '/' { + endpoints[i] += "/" + } + } + + proxy := Clientimpl{ + endpoints: endpoints, + activeStackIdx: 0, } - proxy := Clientimpl{endpoint} + return &proxy } @@ -105,18 +115,7 @@ func RequestPages(r *http.Response) (uint64, error) { // Ping makes sure the server is up func (g *Clientimpl) Ping() error { - client := g.newHTTPClient() - url, err := g.prepareURL("ping") - if err != nil { - return errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return err - } - - res, err := client.Do(req) + res, err := g.httpGet("ping") if res != nil { defer res.Body.Close() } @@ -127,24 +126,12 @@ func (g *Clientimpl) Ping() error { if res.StatusCode != http.StatusOK { return fmt.Errorf("non ok return status code from the the grid proxy home page: %s", http.StatusText(res.StatusCode)) } - return nil } // Nodes returns nodes with the given filters and pagination parameters func (g *Clientimpl) Nodes(ctx context.Context, filter types.NodeFilter, limit types.Limit) (nodes []types.Node, totalCount int, err error) { - client := g.newHTTPClient() - url, err := g.prepareURL("nodes", filter, limit) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return - } - - res, err := client.Do(req) + res, err := g.httpGet("nodes", filter, limit) if res != nil { defer res.Body.Close() } @@ -166,18 +153,7 @@ func (g *Clientimpl) Nodes(ctx context.Context, filter types.NodeFilter, limit t // Farms returns farms with the given filters and pagination parameters func (g *Clientimpl) Farms(ctx context.Context, filter types.FarmFilter, limit types.Limit) (farms []types.Farm, totalCount int, err error) { - url, err := g.prepareURL("farms", filter, limit) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to prepare url") - } - - client := g.newHTTPClient() - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return - } - - res, err := client.Do(req) + res, err := g.httpGet("farms", filter, limit) if res != nil { defer res.Body.Close() } @@ -203,18 +179,7 @@ func (g *Clientimpl) Farms(ctx context.Context, filter types.FarmFilter, limit t // Twins returns twins with the given filters and pagination parameters func (g *Clientimpl) Twins(ctx context.Context, filter types.TwinFilter, limit types.Limit) (twins []types.Twin, totalCount int, err error) { - url, err := g.prepareURL("twins", filter, limit) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to prepare url") - } - - client := g.newHTTPClient() - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return - } - - res, err := client.Do(req) + res, err := g.httpGet("twins", filter, limit) if res != nil { defer res.Body.Close() } @@ -240,18 +205,7 @@ func (g *Clientimpl) Twins(ctx context.Context, filter types.TwinFilter, limit t // Contracts returns contracts with the given filters and pagination parameters func (g *Clientimpl) Contracts(ctx context.Context, filter types.ContractFilter, limit types.Limit) (contracts []types.Contract, totalCount int, err error) { - url, err := g.prepareURL("contracts", filter, limit) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to prepare url") - } - - client := g.newHTTPClient() - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return - } - - res, err := client.Do(req) + res, err := g.httpGet("contracts", filter, limit) if res != nil { defer res.Body.Close() } @@ -274,23 +228,12 @@ func (g *Clientimpl) Contracts(ctx context.Context, filter types.ContractFilter, // Node returns the node with the give id func (g *Clientimpl) Node(ctx context.Context, nodeID uint32) (node types.NodeWithNestedCapacity, err error) { - client := g.newHTTPClient() - url, err := g.prepareURL(fmt.Sprintf("nodes/%d", nodeID)) - if err != nil { - return types.NodeWithNestedCapacity{}, errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return types.NodeWithNestedCapacity{}, fmt.Errorf("failed to create node request: %w", err) - } - - res, err := client.Do(req) + res, err := g.httpGet(fmt.Sprintf("nodes/%d", nodeID)) if res != nil { defer res.Body.Close() } if err != nil { - return types.NodeWithNestedCapacity{}, err + return } if res.StatusCode != http.StatusOK { @@ -307,23 +250,12 @@ func (g *Clientimpl) Node(ctx context.Context, nodeID uint32) (node types.NodeWi // NodeStatus returns the node status up/down func (g *Clientimpl) NodeStatus(ctx context.Context, nodeID uint32) (status types.NodeStatus, err error) { - client := g.newHTTPClient() - url, err := g.prepareURL(fmt.Sprintf("nodes/%d/status", nodeID)) - if err != nil { - return types.NodeStatus{}, errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return types.NodeStatus{}, fmt.Errorf("failed to create nodes request: %w", err) - } - - res, err := client.Do(req) + res, err := g.httpGet(fmt.Sprintf("nodes/%d/status", nodeID)) if res != nil { defer res.Body.Close() } if err != nil { - return types.NodeStatus{}, err + return } if res.StatusCode != http.StatusOK { @@ -338,23 +270,12 @@ func (g *Clientimpl) NodeStatus(ctx context.Context, nodeID uint32) (status type // Stats return statistics about the grid func (g *Clientimpl) Stats(ctx context.Context, filter types.StatsFilter) (stats types.Stats, err error) { - url, err := g.prepareURL("stats", filter) - if err != nil { - return types.Stats{}, errors.Wrap(err, "failed to prepare url") - } - - client := g.newHTTPClient() - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return types.Stats{}, fmt.Errorf("failed to create stats request: %w", err) - } - - res, err := client.Do(req) + res, err := g.httpGet("stats", filter) if res != nil { defer res.Body.Close() } if err != nil { - return types.Stats{}, err + return } if res.StatusCode != http.StatusOK { @@ -369,18 +290,7 @@ func (g *Clientimpl) Stats(ctx context.Context, filter types.StatsFilter) (stats // Contract returns a single contract based on the contractID func (g *Clientimpl) Contract(ctx context.Context, contractID uint32) (types.Contract, error) { - client := g.newHTTPClient() - url, err := g.prepareURL(fmt.Sprintf("contracts/%d", contractID)) - if err != nil { - return types.Contract{}, errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return types.Contract{}, fmt.Errorf("failed to create contract request: %w", err) - } - - res, err := client.Do(req) + res, err := g.httpGet(fmt.Sprintf("contracts/%d", contractID)) if res != nil { defer res.Body.Close() } @@ -408,18 +318,7 @@ func (g *Clientimpl) Contract(ctx context.Context, contractID uint32) (types.Con // ContractBills returns all bills for a single contract based on contractID and pagination params func (g *Clientimpl) ContractBills(ctx context.Context, contractID uint32, limit types.Limit) ([]types.ContractBilling, uint, error) { - client := g.newHTTPClient() - url, err := g.prepareURL(fmt.Sprintf("contracts/%d/bills", contractID), limit) - if err != nil { - return nil, 0, errors.Wrap(err, "failed to prepare url") - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return nil, 0, err - } - - res, err := client.Do(req) + res, err := g.httpGet(fmt.Sprintf("contracts/%d/bills", contractID), limit) if res != nil { defer res.Body.Close() } @@ -474,7 +373,7 @@ func (g *Clientimpl) prepareURL(path string, params ...interface{}) (string, err } } - baseURL := g.endpoint + baseURL := g.endpoints[g.activeStackIdx] u, err := url.ParseRequestURI(baseURL) if err != nil { @@ -486,3 +385,45 @@ func (g *Clientimpl) prepareURL(path string, params ...interface{}) (string, err return u.String(), nil } + +func (g *Clientimpl) httpGet(path string, params ...interface{}) (resp *http.Response, reqErr error) { + client := g.newHTTPClient() + + backoffCfg := backoff.WithMaxRetries( + backoff.NewConstantBackOff(1*time.Millisecond), + 2, + ) + + err := backoff.RetryNotify(func() error { + url, err := g.prepareURL(path, params...) + if err != nil { + reqErr = errors.Wrap(err, "failed to prepare url") + return nil + } + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + reqErr = err + return nil + } + + resp, reqErr = client.Do(req) + if reqErr != nil && + (errors.Is(reqErr, http.ErrAbortHandler) || + errors.Is(reqErr, http.ErrHandlerTimeout) || + errors.Is(reqErr, http.ErrServerClosed)) { + g.activeStackIdx = (g.activeStackIdx + 1) % len(g.endpoints) + return reqErr + } + + return nil + }, backoffCfg, func(err error, _ time.Duration) { + log.Error().Err(err).Msg("failed to connect to endpoint, retrying") + }) + + if err != nil { + log.Error().Err(err).Msg("failed to connect to endpoint") + } + + return +} diff --git a/grid-proxy/pkg/client/grid_client_test.go b/grid-proxy/pkg/client/grid_client_test.go index 4fa3edf12..ce96db4d9 100644 --- a/grid-proxy/pkg/client/grid_client_test.go +++ b/grid-proxy/pkg/client/grid_client_test.go @@ -117,7 +117,7 @@ func MarshalNodeStatus(data []byte) (info types.NodeStatus) { return } -type ProxyFunc func(url string) Client +type ProxyFunc func(urls ...string) Client func TestConnectionFailures(t *testing.T) { testConnectionFailures(t, NewClient) @@ -356,7 +356,7 @@ func TestPrepareURL(t *testing.T) { endpoint := "http://www.gridproxy.com" client := Clientimpl{ - endpoint: endpoint, + endpoints: []string{endpoint}, } want := "http://www.gridproxy.com/nodes?status=st&free_mru=10&farm_ids=1&farm_ids=2&farm_ids=3&dedicated=true&size=50&page=1" diff --git a/grid-proxy/pkg/client/retrying_grid_client_test.go b/grid-proxy/pkg/client/retrying_grid_client_test.go index 776b3ba41..622ea8f07 100644 --- a/grid-proxy/pkg/client/retrying_grid_client_test.go +++ b/grid-proxy/pkg/client/retrying_grid_client_test.go @@ -61,8 +61,8 @@ func (r *requestCounter) ContractBills(ctx context.Context, contractID uint32, l return nil, 0, errors.New("error") } -func retryingConstructor(u string) Client { - return NewRetryingClientWithTimeout(NewClient(u), 1*time.Millisecond) +func retryingConstructor(u ...string) Client { + return NewRetryingClientWithTimeout(NewClient(u...), 1*time.Millisecond) } func TestRetryingConnectionFailures(t *testing.T) {