Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: add orchestrator command (#150)
Browse files Browse the repository at this point in the history
* feat: add orchestrator command

* chore: update after merging main

* Update cmd/qgb/orchestrator/cmd.go
  • Loading branch information
rach-id authored Feb 20, 2023
1 parent dc40f9b commit 7559102
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 2 deletions.
148 changes: 146 additions & 2 deletions cmd/qgb/orchestrator/cmd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
package orchestrator

import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/app/encoding"
"github.com/celestiaorg/orchestrator-relayer/cmd/qgb/helpers"
"github.com/celestiaorg/orchestrator-relayer/orchestrator"
"github.com/celestiaorg/orchestrator-relayer/p2p"
"github.com/celestiaorg/orchestrator-relayer/rpc"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"
tmlog "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/rpc/client/http"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func Command() *cobra.Command {
Expand All @@ -10,15 +31,138 @@ func Command() *cobra.Command {
Aliases: []string{"orch"},
Short: "Runs the QGB orchestrator to sign attestations",
RunE: func(cmd *cobra.Command, args []string) error {
_, err := parseOrchestratorFlags(cmd)
config, err := parseOrchestratorFlags(cmd)
if err != nil {
return err
}
logger := tmlog.NewTMLogger(os.Stdout)

logger.Debug("initializing orchestrator")

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)

// creating an RPC connection to tendermint
trpc, err := http.New(config.tendermintRPC, "/websocket")
if err != nil {
return err
}
err = trpc.Start()
if err != nil {
return err
}
defer func(trpc *http.HTTP) {
err := trpc.Stop()
if err != nil {
logger.Error(err.Error())
}
}(trpc)

// creating tendermint querier
tmQuerier := rpc.NewTmQuerier(trpc, logger)
if err != nil {
return err
}

// creating a grpc connection to Celestia-app
qgbGRPC, err := grpc.Dial(config.celesGRPC, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer func(qgbGRPC *grpc.ClientConn) {
err := qgbGRPC.Close()
if err != nil {
logger.Error(err.Error())
}
}(qgbGRPC)

// creating the application querier
appQuerier := rpc.NewAppQuerier(logger, qgbGRPC, encCfg)

// creating the host
h, err := libp2p.New()
if err != nil {
return err
}
logger.Info(
"created host",
"ID",
h.ID().String(),
"Addresses",
h.Addrs(),
)
// creating the data store
dataStore := dssync.MutexWrap(ds.NewMapDatastore())

// TODO add implementation
// get the bootstrappers
var bootstrappers []peer.AddrInfo
if config.bootstrappers == "" {
bootstrappers = nil
} else {
bs := strings.Split(config.bootstrappers, ",")
bootstrappers, err = helpers.ParseAddrInfos(logger, bs)
if err != nil {
return err
}
}

// creating the dht
dht, err := p2p.NewQgbDHT(cmd.Context(), h, dataStore, bootstrappers, logger)
if err != nil {
return err
}

// wait for the dht to have some peers
err = dht.WaitForPeers(cmd.Context(), time.Hour, 10*time.Second, 1)
if err != nil {
return err
}

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)

broadcaster := orchestrator.NewBroadcaster(dht)
if err != nil {
return err
}

retrier := orchestrator.NewRetrier(logger, 5, 15*time.Second)
orch, err := orchestrator.New(
logger,
appQuerier,
tmQuerier,
p2pQuerier,
broadcaster,
retrier,
*config.privateKey,
)
if err != nil {
panic(err)
}

logger.Debug("starting orchestrator")

// Listen for and trap any OS signal to gracefully shutdown and exit
go trapSignal(logger, cancel)

orch.Start(ctx)

return nil
},
}
return addOrchestratorFlags(command)
}

// trapSignal will listen for any OS signal and gracefully exit.
func trapSignal(logger tmlog.Logger, cancel context.CancelFunc) {
sigCh := make(chan os.Signal, 1)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

sig := <-sigCh
logger.Info("caught signal; shutting down...", "signal", sig.String())
cancel()
}
8 changes: 8 additions & 0 deletions cmd/qgb/orchestrator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
celestiaGRPCFlag = "celes-grpc"
evmPrivateKeyFlag = "evm-priv-key"
tendermintRPCFlag = "celes-http-rpc"
bootstrappersFlag = "bootstrappers"
)

func addOrchestratorFlags(cmd *cobra.Command) *cobra.Command {
Expand All @@ -26,12 +27,14 @@ func addOrchestratorFlags(cmd *cobra.Command) *cobra.Command {
"",
"Specify the ECDSA private key used to sign orchestrator commitments in hex",
)
cmd.Flags().StringP(bootstrappersFlag, "b", "", "Comma-separated multiaddresses of p2p peers to connect to")
return cmd
}

type Config struct {
celestiaChainID, celesGRPC, tendermintRPC string
privateKey *ecdsa.PrivateKey
bootstrappers string
}

func parseOrchestratorFlags(cmd *cobra.Command) (Config, error) {
Expand All @@ -58,11 +61,16 @@ func parseOrchestratorFlags(cmd *cobra.Command) (Config, error) {
if err != nil {
return Config{}, err
}
bootstrappers, err := cmd.Flags().GetString(bootstrappersFlag)
if err != nil {
return Config{}, err
}

return Config{
privateKey: evmPrivKey,
celestiaChainID: chainID,
celesGRPC: celesGRPC,
tendermintRPC: tendermintRPC,
bootstrappers: bootstrappers,
}, nil
}

0 comments on commit 7559102

Please sign in to comment.