Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution SDK on Cosmos #1463

Merged
merged 25 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4249445
Change import path name of cosmos-sdk/types in cosmos/module.go
NicolasMahe Nov 19, 2019
04c9251
Add cosmos handler wrapper that requires a hash to be returned. It em…
NicolasMahe Nov 19, 2019
444dfe8
add stream function to cosmos client
NicolasMahe Nov 19, 2019
7bd27f7
Merge branch 'feature/cosmos-event'
NicolasMahe Nov 19, 2019
499b03b
Merge branch 'feature/container-runner'
NicolasMahe Nov 19, 2019
054baf3
Add executorHash to execution
NicolasMahe Nov 19, 2019
7fb58e4
Move execution sdk to cosmos sdk
NicolasMahe Nov 19, 2019
262b870
Update codebase to reflect modification of previous commit
NicolasMahe Nov 19, 2019
3216db2
Remove useless files from database package
NicolasMahe Nov 19, 2019
02e26b8
Update e2e test service
NicolasMahe Nov 19, 2019
6794af4
Merge branch 'dev' into feature/execution-store
Nov 19, 2019
a54e690
Merge branch 'dev' into feature/execution-store
krhubert Nov 19, 2019
1da0b08
Refactor exeuction
krhubert Nov 19, 2019
158e204
Select context done in execution stream
krhubert Nov 20, 2019
7f806ea
Merge branch dev
NicolasMahe Nov 20, 2019
237e85d
remove instanceHash from CreateExecutionRequest
NicolasMahe Nov 20, 2019
e78699b
Refactor
krhubert Nov 20, 2019
8168750
Remove commented code + fix linter
krhubert Nov 21, 2019
40e42a8
remove ErrTxInCache in execution sdk
NicolasMahe Nov 22, 2019
a16664f
Remove check on process in execution because process is not on cosmos…
NicolasMahe Nov 22, 2019
6d37b56
fix hash returned on error in runner backend handler function
NicolasMahe Nov 22, 2019
713b7a1
check for existing runner in runner sdk create
NicolasMahe Nov 22, 2019
a552e89
check for existing execution in execution sdk backend create function
NicolasMahe Nov 22, 2019
f17d871
Merge branch 'dev' into feature/execution-store
antho1404 Nov 22, 2019
adf0b71
fix lint
antho1404 Nov 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,16 @@ import (
"github.com/mesg-foundation/engine/version"
"github.com/mesg-foundation/engine/x/xerrors"
"github.com/mesg-foundation/engine/x/xnet"
"github.com/mesg-foundation/engine/x/xrand"
"github.com/mesg-foundation/engine/x/xsignal"
"github.com/sirupsen/logrus"
tmtypes "github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
)

func initDatabases(cfg *config.Config) (*database.LevelDBExecutionDB, *database.LevelDBProcessDB, error) {
// init execution db.
executionDB, err := database.NewExecutionDB(filepath.Join(cfg.Path, cfg.Database.ExecutionRelativePath))
if err != nil {
return nil, nil, err
}
func initDatabases(cfg *config.Config) (*database.LevelDBProcessDB, error) {
// init process db.
processDB, err := database.NewProcessDB(filepath.Join(cfg.Path, cfg.Database.ProcessRelativePath))
if err != nil {
return nil, nil, err
}
return executionDB, processDB, nil
return database.NewProcessDB(filepath.Join(cfg.Path, cfg.Database.ProcessRelativePath))
}

func stopRunningServices(sdk *enginesdk.SDK, cfg *config.Config, address string) error {
Expand Down Expand Up @@ -123,6 +115,8 @@ func loadOrGenDevGenesis(app *cosmos.App, kb *cosmos.Keybase, cfg *config.Config
}

func main() {
xrand.SeedInit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear what's the reason for this. Is it for the subscriber name that we generate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. but i think the goal of the package xrand is to centralized on random related function.
So we don't have the send the random everywhere in the code. it's done once and that's it.


cfg, err := config.New()
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
Expand All @@ -132,7 +126,7 @@ func main() {
logger.Init(cfg.Log.Format, cfg.Log.Level, cfg.Log.ForceColors)

// init databases
executionDB, processDB, err := initDatabases(cfg)
processDB, err := initDatabases(cfg)
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
Expand All @@ -155,7 +149,7 @@ func main() {

// register the backend modules to the app factory.
// TODO: this is a mandatory call so it should return a new types required by cosmos.NewApp
enginesdk.NewBackend(appFactory)
backend := enginesdk.NewBackend(appFactory)

// init cosmos app
app, err := cosmos.NewApp(appFactory)
Expand Down Expand Up @@ -191,7 +185,9 @@ func main() {
client := cosmos.NewClient(node, kb, genesis.ChainID)

// init sdk
sdk := enginesdk.New(client, kb, executionDB, processDB, container, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint)
sdk := enginesdk.New(client, kb, processDB, container, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint)
// TODO: this is a hack and will be remove when process sdk is running on cosmos
backend.Execution.SetProcessSDK(sdk.Process)

// start tendermint node
logrus.WithField("module", "main").WithField("seeds", cfg.Tendermint.Config.P2P.Seeds).Info("starting tendermint node")
Expand All @@ -211,7 +207,7 @@ func main() {
}()

logrus.WithField("module", "main").Info("starting process engine")
s := orchestrator.New(sdk.Event, sdk.Execution, sdk.Process)
s := orchestrator.New(sdk.Event, sdk.Execution, sdk.Process, sdk.Runner, cfg.Account.Name, cfg.Account.Password)
go func() {
if err := s.Start(); err != nil {
logrus.WithField("module", "main").Fatalln(err)
Expand Down
33 changes: 33 additions & 0 deletions cosmos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/mesg-foundation/engine/codec"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/x/xreflect"
"github.com/mesg-foundation/engine/x/xstrings"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/node"
rpcclient "github.com/tendermint/tendermint/rpc/client"
Expand Down Expand Up @@ -129,3 +131,34 @@ func (c *Client) BuildAndBroadcastMsg(msg sdktypes.Msg, accName, accPassword str
return nil, errors.New("i/o timeout")
}
}

// Stream subscribes to the provided query and returns the hash of the matching ressources.
func (c *Client) Stream(query string) (chan hash.Hash, error) {
subscriber := xstrings.RandAsciiLetters(8)
eventStream, err := c.Subscribe(context.Background(), subscriber, query)
if err != nil {
return nil, err
}

hashC := make(chan hash.Hash)
go func() {
for event := range eventStream {
ressHashes := event.Events[EventHashType]
if len(ressHashes) != 1 {
// or panic(err) - grpc api do not support
krhubert marked this conversation as resolved.
Show resolved Hide resolved
// return the errors on the stream for now
// so besieds logging the error, it not
// much we can do here. same belove
continue
}

hash, err := hash.Decode(ressHashes[0])
if err != nil {
continue
}
hashC <- hash
}
c.Unsubscribe(context.Background(), subscriber, query)
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
}()
return hashC, nil
}
62 changes: 46 additions & 16 deletions cosmos/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (

"github.com/cosmos/cosmos-sdk/client/context"
cosmoscodec "github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
cosmostypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/gorilla/mux"
"github.com/mesg-foundation/engine/codec"
"github.com/mesg-foundation/engine/hash"
"github.com/spf13/cobra"
abci "github.com/tendermint/tendermint/abci/types"
)
Expand All @@ -27,12 +28,15 @@ type AppModuleBasic struct {
// AppModule is a main element of an cosmos app.
type AppModule struct {
AppModuleBasic
handler sdk.Handler
handler Handler
querier Querier
}

// Handler defines the core of the state transition function of an application.
type Handler func(request cosmostypes.Request, msg cosmostypes.Msg) (hash.Hash, cosmostypes.Error)
krhubert marked this conversation as resolved.
Show resolved Hide resolved

// Querier is responsible to answer to ABCI queries.
type Querier func(request sdk.Request, path []string, req abci.RequestQuery) (res interface{}, err error)
type Querier func(request cosmostypes.Request, path []string, req abci.RequestQuery) (res interface{}, err error)

// NewAppModuleBasic inits an AppModuleBasic using a name.
func NewAppModuleBasic(name string) AppModuleBasic {
Expand All @@ -42,7 +46,7 @@ func NewAppModuleBasic(name string) AppModuleBasic {
}

// NewAppModule inits an AppModule using an AppModuleBasic, Handler and Querier.
func NewAppModule(moduleBasic AppModuleBasic, handler sdk.Handler, querier Querier) AppModule {
func NewAppModule(moduleBasic AppModuleBasic, handler Handler, querier Querier) AppModule {
return AppModule{
AppModuleBasic: moduleBasic,
handler: handler,
Expand Down Expand Up @@ -91,16 +95,42 @@ func (AppModuleBasic) GetTxCmd(cdc *cosmoscodec.Codec) *cobra.Command {
// ----------------------------------------------

// RegisterInvariants registers invariants to the registry.
func (m AppModule) RegisterInvariants(ir sdk.InvariantRegistry) {}
func (m AppModule) RegisterInvariants(ir cosmostypes.InvariantRegistry) {}

// Route returns the route prefix for transaction of the module.
func (m AppModule) Route() string {
return m.name
}

// NewHandler returns the handler used to apply transactions.
func (m AppModule) NewHandler() sdk.Handler {
return m.handler
func (m AppModule) NewHandler() cosmostypes.Handler {
return func(request cosmostypes.Request, msg cosmostypes.Msg) cosmostypes.Result {
hash, err := m.handler(request, msg)
if err != nil {
return err.Result()
}

events := request.EventManager().Events()
events = events.AppendEvent(
cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(cosmostypes.AttributeKeyModule, m.name),
),
)

if hash != nil {
events = events.AppendEvent(
cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(AttributeKeyHash, hash.String()),
),
)
}
return cosmostypes.Result{
Data: hash,
Events: events,
}
}
}

// QuerierRoute the route prefix for query of the module.
Expand All @@ -109,37 +139,37 @@ func (m AppModule) QuerierRoute() string {
}

// NewQuerierHandler returns the handler used to reply ABCI query.
func (m AppModule) NewQuerierHandler() sdk.Querier {
return func(request sdk.Request, path []string, req abci.RequestQuery) ([]byte, sdk.Error) {
func (m AppModule) NewQuerierHandler() cosmostypes.Querier {
return func(request cosmostypes.Request, path []string, req abci.RequestQuery) ([]byte, cosmostypes.Error) {
data, err := m.querier(request, path, req)
if err != nil {
if errsdk, ok := err.(sdk.Error); ok {
if errsdk, ok := err.(cosmostypes.Error); ok {
return nil, errsdk
}
return nil, sdk.ErrInternal(err.Error())
return nil, cosmostypes.ErrInternal(err.Error())
}
res, err := codec.MarshalBinaryBare(data)
if err != nil {
return nil, sdk.ErrInternal(err.Error())
return nil, cosmostypes.ErrInternal(err.Error())
}
return res, nil
}
}

// BeginBlock is called at the beginning of the process of a new block.
func (m AppModule) BeginBlock(_ sdk.Request, _ abci.RequestBeginBlock) {}
func (m AppModule) BeginBlock(_ cosmostypes.Request, _ abci.RequestBeginBlock) {}

// EndBlock is called at the end of the process of a new block.
func (m AppModule) EndBlock(sdk.Request, abci.RequestEndBlock) []abci.ValidatorUpdate {
func (m AppModule) EndBlock(cosmostypes.Request, abci.RequestEndBlock) []abci.ValidatorUpdate {
return []abci.ValidatorUpdate{}
}

// InitGenesis initializes the genesis from a request and data.
func (m AppModule) InitGenesis(request sdk.Request, data json.RawMessage) []abci.ValidatorUpdate {
func (m AppModule) InitGenesis(request cosmostypes.Request, data json.RawMessage) []abci.ValidatorUpdate {
return []abci.ValidatorUpdate{}
}

// ExportGenesis exports the current state of the app.
func (m AppModule) ExportGenesis(request sdk.Request) json.RawMessage {
func (m AppModule) ExportGenesis(request cosmostypes.Request) json.RawMessage {
return []byte("{}")
}
24 changes: 24 additions & 0 deletions cosmos/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cosmos

import (
"fmt"

cosmostypes "github.com/cosmos/cosmos-sdk/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
)

// common attribute keys.
const (
AttributeKeyHash = "hash"
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
)

// EventHashType
var EventHashType = cosmostypes.EventTypeMessage + "." + AttributeKeyHash

func EventActionQuery(msgType string) string {
return fmt.Sprintf("%s.%s='%s'", sdktypes.EventTypeMessage, sdktypes.AttributeKeyAction, msgType)
}

func EventModuleQuery(module string) string {
return fmt.Sprintf("%s.%s='%s'", sdktypes.EventTypeMessage, sdktypes.AttributeKeyModule, module)
}
120 changes: 0 additions & 120 deletions database/execution_db.go

This file was deleted.

Loading