From bc2b445b0ed583eec27f14fd9797497ab0f0f969 Mon Sep 17 00:00:00 2001 From: krhubert Date: Sat, 15 Feb 2020 20:33:33 +0100 Subject: [PATCH] Refactor cosmos module - create cosmos module client - removed sdk packages - create runner/builder and event/publisher packages - remove orchestrator tests & mocks --- Dockerfile.tools | 1 - Makefile | 5 +- core/main.go | 28 +- cosmos/client.go | 17 -- cosmos/module_client.go | 284 ++++++++++++++++++++ event/event.go | 4 +- {sdk/event => event}/event_listener.go | 11 +- {sdk/event => event}/event_listener_test.go | 25 +- event/publisher/publisher.go | 56 ++++ go.mod | 2 - go.sum | 15 -- internal/tools/tools.go | 1 - orchestrator/mocks/EventSDK.go | 27 -- orchestrator/mocks/ExecutionSDK.go | 92 ------- orchestrator/mocks/ProcessSDK.go | 35 --- orchestrator/mocks/RunnerSDK.go | 36 --- orchestrator/orchestrator.go | 25 +- orchestrator/orchestrator_test.go | 271 ------------------- orchestrator/type.go | 43 +-- runner/builder/builder.go | 124 +++++++++ {sdk/runner => runner/builder}/container.go | 2 +- scripts/build-mocks.sh | 7 - sdk/event/event.go | 64 ----- sdk/execution/sdk.go | 118 -------- sdk/process/sdk.go | 66 ----- sdk/runner/sdk.go | 177 ------------ sdk/sdk.go | 38 --- sdk/service/sdk.go | 71 ----- server/grpc/api/event.go | 18 +- server/grpc/api/execution.go | 23 +- server/grpc/api/instance.go | 32 +-- server/grpc/api/ownership.go | 23 +- server/grpc/api/process.go | 20 +- server/grpc/api/runner.go | 42 +-- server/grpc/api/service.go | 26 +- server/grpc/server.go | 32 +-- x/instance/internal/keeper/querier.go | 4 +- 37 files changed, 615 insertions(+), 1250 deletions(-) create mode 100644 cosmos/module_client.go rename {sdk/event => event}/event_listener.go (85%) rename {sdk/event => event}/event_listener_test.go (70%) create mode 100644 event/publisher/publisher.go delete mode 100644 orchestrator/mocks/EventSDK.go delete mode 100644 orchestrator/mocks/ExecutionSDK.go delete mode 100644 orchestrator/mocks/ProcessSDK.go delete mode 100644 orchestrator/mocks/RunnerSDK.go delete mode 100644 orchestrator/orchestrator_test.go create mode 100644 runner/builder/builder.go rename {sdk/runner => runner/builder}/container.go (99%) delete mode 100755 scripts/build-mocks.sh delete mode 100644 sdk/event/event.go delete mode 100644 sdk/execution/sdk.go delete mode 100644 sdk/process/sdk.go delete mode 100644 sdk/runner/sdk.go delete mode 100644 sdk/sdk.go delete mode 100644 sdk/service/sdk.go diff --git a/Dockerfile.tools b/Dockerfile.tools index 40f76a47f..a31455ec6 100644 --- a/Dockerfile.tools +++ b/Dockerfile.tools @@ -38,7 +38,6 @@ RUN go mod download RUN go install github.com/golang/protobuf/protoc-gen-go RUN go install github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc -RUN go install github.com/vektra/mockery/.../ # verify that mesg-dev container is being used. ENV MESG_DEV true diff --git a/Makefile b/Makefile index f83e06987..246632d32 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all build build-cmd-cosmos changelog check-version clean clean-build clean-docker dep dev dev-mon dev-start dev-stop docker-build docker-dev docker-publish docker-publish-dev docker-tools genesis lint mock protobuf test +.PHONY: all build build-cmd-cosmos changelog check-version clean clean-build clean-docker dep dev dev-mon dev-start dev-stop docker-build docker-dev docker-publish docker-publish-dev docker-tools genesis lint protobuf test MAJOR_VERSION := $(shell echo $(version) | cut -d . -f 1) MINOR_VERSION := $(shell echo $(version) | cut -d . -f 1-2) @@ -67,9 +67,6 @@ test: dep lint: golangci-lint run -mock: docker-tools - docker run --rm -v $(PWD):/project mesg/tools:local ./scripts/build-mocks.sh - protobuf: docker-tools docker run --rm -v $(PWD):/project mesg/tools:local ./scripts/build-proto.sh diff --git a/core/main.go b/core/main.go index 6556f4ca4..8d1e439b7 100644 --- a/core/main.go +++ b/core/main.go @@ -17,6 +17,7 @@ import ( "github.com/mesg-foundation/engine/config" "github.com/mesg-foundation/engine/container" "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/ext/xerrors" "github.com/mesg-foundation/engine/ext/xnet" "github.com/mesg-foundation/engine/ext/xrand" @@ -25,8 +26,7 @@ import ( "github.com/mesg-foundation/engine/logger" "github.com/mesg-foundation/engine/orchestrator" "github.com/mesg-foundation/engine/protobuf/api" - enginesdk "github.com/mesg-foundation/engine/sdk" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" + "github.com/mesg-foundation/engine/runner/builder" "github.com/mesg-foundation/engine/server/grpc" "github.com/mesg-foundation/engine/version" "github.com/sirupsen/logrus" @@ -35,8 +35,8 @@ import ( db "github.com/tendermint/tm-db" ) -func stopRunningServices(sdk *enginesdk.SDK, address string) error { - runners, err := sdk.Runner.List(&runnersdk.Filter{Address: address}) +func stopRunningServices(mc *cosmos.ModuleClient, b *builder.Builder, address string) error { + runners, err := mc.ListRunner(&cosmos.FilterRunner{Address: address}) if err != nil { return err } @@ -49,7 +49,7 @@ func stopRunningServices(sdk *enginesdk.SDK, address string) error { for _, instance := range runners { go func(hash hash.Hash) { defer wg.Done() - err := sdk.Runner.Delete(&api.DeleteRunnerRequest{ + err := b.Delete(&api.DeleteRunnerRequest{ Hash: hash, DeleteData: false, }) @@ -128,9 +128,6 @@ func main() { // init logger. logger.Init(cfg.Log.Format, cfg.Log.Level, cfg.Log.ForceColors) - // init basicManager - // basicManager := enginesdk.NewBasicManager() - // init tendermint logger tendermintLogger := logger.TendermintLogger() @@ -182,8 +179,13 @@ func main() { logrus.WithField("module", "main").Fatalln(err) } - // init sdk - sdk := enginesdk.New(client, kb, container, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint) + mc := cosmos.NewModuleClient(client) + + // init runner builder + b := builder.New(mc, container, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint) + + // init event publiserh + ep := publisher.New(mc) // start tendermint node logrus.WithField("module", "main").WithField("seeds", cfg.Tendermint.Config.P2P.Seeds).Info("starting tendermint node") @@ -192,7 +194,7 @@ func main() { } // init gRPC server. - server := grpc.New(sdk, cfg, client) + server := grpc.New(mc, ep, b) logrus.WithField("module", "main").Infof("starting MESG Engine version %s", version.Version) @@ -203,7 +205,7 @@ func main() { }() logrus.WithField("module", "main").Info("starting process engine") - s := orchestrator.New(sdk.Event, sdk.Execution, sdk.Process, sdk.Runner) + s := orchestrator.New(mc, ep) go func() { if err := s.Start(); err != nil { logrus.WithField("module", "main").Fatalln(err) @@ -244,7 +246,7 @@ func main() { } logrus.WithField("module", "main").Info("stopping running services") - if err := stopRunningServices(sdk, acc.GetAddress().String()); err != nil { + if err := stopRunningServices(mc, b, acc.GetAddress().String()); err != nil { logrus.WithField("module", "main").Fatalln(err) } diff --git a/cosmos/client.go b/cosmos/client.go index 0bcfc4802..e040852db 100644 --- a/cosmos/client.go +++ b/cosmos/client.go @@ -75,23 +75,6 @@ func (c *Client) QueryJSON(path string, qdata, ptr interface{}) error { return c.cdc.UnmarshalJSON(result, ptr) } -// Query is abci.query wrapper with errors check and decode data. -func (c *Client) Query(path string, qdata, ptr interface{}) error { - var data []byte - if !xreflect.IsNil(qdata) { - b, err := c.cdc.MarshalBinaryBare(qdata) - if err != nil { - return err - } - data = b - } - result, _, err := c.QueryWithData(path, data) - if err != nil { - return err - } - return c.cdc.UnmarshalBinaryBare(result, ptr) -} - // QueryWithData performs a query to a Tendermint node with the provided path // and a data payload. It returns the result and height of the query upon success // or an error if the query fails. diff --git a/cosmos/module_client.go b/cosmos/module_client.go new file mode 100644 index 000000000..2bb8d0e5f --- /dev/null +++ b/cosmos/module_client.go @@ -0,0 +1,284 @@ +package cosmos + +import ( + "context" + "fmt" + + executionpb "github.com/mesg-foundation/engine/execution" + "github.com/mesg-foundation/engine/ext/xos" + "github.com/mesg-foundation/engine/hash" + instancepb "github.com/mesg-foundation/engine/instance" + ownershippb "github.com/mesg-foundation/engine/ownership" + processpb "github.com/mesg-foundation/engine/process" + "github.com/mesg-foundation/engine/protobuf/api" + runnerpb "github.com/mesg-foundation/engine/runner" + servicepb "github.com/mesg-foundation/engine/service" + "github.com/mesg-foundation/engine/x/execution" + "github.com/mesg-foundation/engine/x/instance" + "github.com/mesg-foundation/engine/x/ownership" + "github.com/mesg-foundation/engine/x/process" + "github.com/mesg-foundation/engine/x/runner" + "github.com/mesg-foundation/engine/x/service" +) + +type ModuleClient struct { + *Client +} + +func NewModuleClient(c *Client) *ModuleClient { + return &ModuleClient{Client: c} +} + +func sroutef(format string, args ...interface{}) string { + return fmt.Sprintf("custom/"+format, args...) +} + +// Create creates a new service from definition. +func (mc *ModuleClient) CreateService(req *api.CreateServiceRequest) (*servicepb.Service, error) { + acc, err := mc.GetAccount() + if err != nil { + return nil, err + } + msg := service.NewMsgCreateService(acc.GetAddress(), req) + tx, err := mc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return mc.GetService(tx.Data) +} + +// GetService returns the service that matches given hash. +func (mc *ModuleClient) GetService(hash hash.Hash) (*servicepb.Service, error) { + var out *servicepb.Service + route := sroutef("%s/%s/%s", service.QuerierRoute, service.QueryGetService, hash) + return out, mc.QueryJSON(route, nil, &out) +} + +// ListService returns all services. +func (mc *ModuleClient) ListService() ([]*servicepb.Service, error) { + var out []*servicepb.Service + route := sroutef("%s/%s", service.QuerierRoute, service.QueryListService) + return out, mc.QueryJSON(route, nil, &out) +} + +// ExistService returns if a service already exists. +func (mc *ModuleClient) ExistService(hash hash.Hash) (bool, error) { + var out bool + route := sroutef("%s/%s/%s", service.QuerierRoute, service.QueryExistService, hash) + return out, mc.QueryJSON(route, nil, &out) +} + +// HashService returns the calculate hash of a service. +func (mc *ModuleClient) HashService(req *api.CreateServiceRequest) (hash.Hash, error) { + var out hash.Hash + route := sroutef("%s/%s", service.QuerierRoute, service.QueryHashService) + return out, mc.QueryJSON(route, req, &out) +} + +// CreateProcess creates a new process. +func (mc *ModuleClient) CreateProcess(req *api.CreateProcessRequest) (*processpb.Process, error) { + acc, err := mc.GetAccount() + if err != nil { + return nil, err + } + msg := process.NewMsgCreateProcess(acc.GetAddress(), req) + tx, err := mc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return mc.GetProcess(tx.Data) +} + +// GetInstance returns the instance that matches given hash. +func (mc *ModuleClient) GetInstance(hash hash.Hash) (*instancepb.Instance, error) { + var out *instancepb.Instance + route := sroutef("%s/%s/%s", instance.QuerierRoute, instance.QueryGetInstance, hash) + return out, mc.QueryJSON(route, nil, &out) +} + +// ListInstance returns all instances. +func (mc *ModuleClient) ListInstance(req *api.ListInstanceRequest) ([]*instancepb.Instance, error) { + var out []*instancepb.Instance + route := sroutef("%s/%s", instance.QuerierRoute, instance.QueryListInstances) + return out, mc.QueryJSON(route, req, &out) +} + +// ListOwnership returns all ownerships. +func (mc *ModuleClient) ListOwnership() ([]*ownershippb.Ownership, error) { + var out []*ownershippb.Ownership + route := sroutef("%s/%s", ownership.QuerierRoute, ownership.QueryListOwnerships) + return out, mc.QueryJSON(route, nil, &out) +} + +// DeleteProcess deletes the process by hash. +func (mc *ModuleClient) DeleteProcess(req *api.DeleteProcessRequest) error { + acc, err := mc.GetAccount() + if err != nil { + return err + } + msg := process.NewMsgDeleteProcess(acc.GetAddress(), req) + _, err = mc.BuildAndBroadcastMsg(msg) + return err +} + +// GetProcess returns the process that matches given hash. +func (mc *ModuleClient) GetProcess(hash hash.Hash) (*processpb.Process, error) { + var out *processpb.Process + route := sroutef("%s/%s/%s", process.QuerierRoute, process.QueryGetProcess, hash.String()) + return out, mc.QueryJSON(route, nil, &out) +} + +// ListProcess returns all processes. +func (mc *ModuleClient) ListProcess() ([]*processpb.Process, error) { + var out []*processpb.Process + route := sroutef("%s/%s", process.QuerierRoute, process.QueryListProcesses) + return out, mc.QueryJSON(route, nil, &out) +} + +// CreateExecution creates a new execution. +func (mc *ModuleClient) CreateExecution(req *api.CreateExecutionRequest) (*executionpb.Execution, error) { + acc, err := mc.GetAccount() + if err != nil { + return nil, err + } + msg := execution.NewMsgCreateExecution(req, acc.GetAddress()) + tx, err := mc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return mc.GetExecution(tx.Data) +} + +// UpdateExecution updates a execution. +func (mc *ModuleClient) UpdateExecution(req *api.UpdateExecutionRequest) (*executionpb.Execution, error) { + acc, err := mc.GetAccount() + if err != nil { + return nil, err + } + msg := execution.NewMsgUpdateExecution(req, acc.GetAddress()) + tx, err := mc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return mc.GetExecution(tx.Data) +} + +// GetExecution returns the execution that matches given hash. +func (mc *ModuleClient) GetExecution(hash hash.Hash) (*executionpb.Execution, error) { + var out *executionpb.Execution + route := sroutef("%s/%s/%s", execution.QuerierRoute, execution.QueryGetExecution, hash) + return out, mc.QueryJSON(route, nil, &out) +} + +// ListExecution returns all executions. +func (mc *ModuleClient) ListExecution() ([]*executionpb.Execution, error) { + var out []*executionpb.Execution + route := sroutef("%s/%s", execution.QuerierRoute, execution.QueryListExecution) + return out, mc.QueryJSON(route, nil, &out) +} + +// StreamExecution returns execution that matches given hash. +func (mc *ModuleClient) StreamExecution(ctx context.Context, req *api.StreamExecutionRequest) (chan *executionpb.Execution, chan error, error) { + if err := req.Filter.Validate(); err != nil { + return nil, nil, err + } + + stream, serrC, err := mc.Stream(ctx, EventModuleQuery(execution.ModuleName)) + if err != nil { + return nil, nil, err + } + + execC := make(chan *executionpb.Execution) + errC := make(chan error) + go func() { + loop: + for { + select { + case hash := <-stream: + exec, err := mc.GetExecution(hash) + if err != nil { + errC <- err + break + } + if req.Filter.Match(exec) { + execC <- exec + } + case err := <-serrC: + errC <- err + case <-ctx.Done(): + break loop + } + } + close(errC) + close(execC) + }() + return execC, errC, nil +} + +// CreateRunner creates a new runner. +func (mc *ModuleClient) CreateRunner(req *api.CreateRunnerRequest) (*runnerpb.Runner, error) { + s, err := mc.GetService(req.ServiceHash) + if err != nil { + return nil, err + } + envHash := hash.Dump(xos.EnvMergeSlices(s.Configuration.Env, req.Env)) + acc, err := mc.GetAccount() + if err != nil { + return nil, err + } + + msg := runner.NewMsgCreateRunner(acc.GetAddress(), req.ServiceHash, envHash) + tx, err := mc.BuildAndBroadcastMsg(msg) + if err != nil { + return nil, err + } + return mc.GetRunner(tx.Data) +} + +// DeleteRunner deletes an existing runner. +func (mc *ModuleClient) DeleteRunner(req *api.DeleteRunnerRequest) error { + acc, err := mc.GetAccount() + if err != nil { + return err + } + msg := runner.NewMsgDeleteRunner(acc.GetAddress(), req.Hash) + _, err = mc.BuildAndBroadcastMsg(msg) + return err +} + +// GetRunner returns the runner that matches given hash. +func (mc *ModuleClient) GetRunner(hash hash.Hash) (*runnerpb.Runner, error) { + var out *runnerpb.Runner + route := sroutef("%s/%s/%s", runner.QuerierRoute, runner.QueryGetRunner, hash) + return out, mc.QueryJSON(route, nil, &out) +} + +// FilterRunner to apply while listing runners. +type FilterRunner struct { + Address string + InstanceHash hash.Hash +} + +// ListRunner returns all runners. +func (mc *ModuleClient) ListRunner(f *FilterRunner) ([]*runnerpb.Runner, error) { + var rs []*runnerpb.Runner + route := sroutef("%s/%s", runner.QuerierRoute, runner.QueryListRunners) + if err := mc.QueryJSON(route, nil, &rs); err != nil { + return nil, err + } + + // no filter, returns + if f == nil { + return rs, nil + } + + // filter results + out := make([]*runnerpb.Runner, 0) + for _, r := range rs { + if (f.Address == "" || r.Address == f.Address) && + (f.InstanceHash.IsZero() || r.InstanceHash.Equal(f.InstanceHash)) { + out = append(out, r) + } + } + return out, nil +} diff --git a/event/event.go b/event/event.go index 7bf22f3ac..42ec2f972 100644 --- a/event/event.go +++ b/event/event.go @@ -5,8 +5,8 @@ import ( "github.com/mesg-foundation/engine/protobuf/types" ) -// Create creates an event eventKey with eventData for service s. -func Create(instanceHash hash.Hash, eventKey string, eventData *types.Struct) *Event { +// New creates an event eventKey with eventData for service s. +func New(instanceHash hash.Hash, eventKey string, eventData *types.Struct) *Event { e := &Event{ InstanceHash: instanceHash, Key: eventKey, diff --git a/sdk/event/event_listener.go b/event/event_listener.go similarity index 85% rename from sdk/event/event_listener.go rename to event/event_listener.go index 6a720292a..e0e131488 100644 --- a/sdk/event/event_listener.go +++ b/event/event_listener.go @@ -1,8 +1,7 @@ -package eventsdk +package event import ( "github.com/cskr/pubsub" - "github.com/mesg-foundation/engine/event" "github.com/mesg-foundation/engine/hash" ) @@ -14,7 +13,7 @@ type Filter struct { } // Match matches event. -func (f *Filter) Match(e *event.Event) bool { +func (f *Filter) Match(e *Event) bool { if f == nil { return true } @@ -41,7 +40,7 @@ func (f *Filter) HasKey() bool { // Listener provides functionalities to listen MESG events. type Listener struct { - C chan *event.Event + C chan *Event ps *pubsub.PubSub topic string @@ -53,7 +52,7 @@ type Listener struct { // NewListener creates a new Listener with given sdk and filters. func NewListener(ps *pubsub.PubSub, topic string, f *Filter) *Listener { return &Listener{ - C: make(chan *event.Event, 1), + C: make(chan *Event, 1), ps: ps, topic: topic, c: ps.Sub(topic), @@ -72,7 +71,7 @@ func (l *Listener) Close() { // Listen listens events that match filter. func (l *Listener) Listen() { for v := range l.c { - if e := v.(*event.Event); l.filter.Match(e) { + if e := v.(*Event); l.filter.Match(e) { l.C <- e } } diff --git a/sdk/event/event_listener_test.go b/event/event_listener_test.go similarity index 70% rename from sdk/event/event_listener_test.go rename to event/event_listener_test.go index 38fa23765..521c757ef 100644 --- a/sdk/event/event_listener_test.go +++ b/event/event_listener_test.go @@ -1,10 +1,9 @@ -package eventsdk +package event import ( "testing" "github.com/cskr/pubsub" - "github.com/mesg-foundation/engine/event" "github.com/mesg-foundation/engine/hash" "github.com/stretchr/testify/assert" ) @@ -12,7 +11,7 @@ import ( func TestFilter(t *testing.T) { var tests = []struct { f *Filter - e *event.Event + e *Event match bool }{ { @@ -22,42 +21,42 @@ func TestFilter(t *testing.T) { }, { &Filter{}, - &event.Event{}, + &Event{}, true, }, { &Filter{Hash: hash.Int(1)}, - &event.Event{Hash: hash.Int(1)}, + &Event{Hash: hash.Int(1)}, true, }, { &Filter{Hash: hash.Int(1)}, - &event.Event{Hash: hash.Int(2)}, + &Event{Hash: hash.Int(2)}, false, }, { &Filter{InstanceHash: hash.Int(1)}, - &event.Event{InstanceHash: hash.Int(1)}, + &Event{InstanceHash: hash.Int(1)}, true, }, { &Filter{InstanceHash: hash.Int(1)}, - &event.Event{InstanceHash: hash.Int(1)}, + &Event{InstanceHash: hash.Int(1)}, true, }, { &Filter{Key: "0"}, - &event.Event{Key: "0"}, + &Event{Key: "0"}, true, }, { &Filter{Key: "*"}, - &event.Event{Key: "0"}, + &Event{Key: "0"}, true, }, { &Filter{Key: "0"}, - &event.Event{Key: "1"}, + &Event{Key: "1"}, false, }, } @@ -69,12 +68,12 @@ func TestFilter(t *testing.T) { func TestEventListener(t *testing.T) { topic := "test-topic" - testEvent := &event.Event{Key: "0"} + testEvent := &Event{Key: "0"} ps := pubsub.New(0) el := NewListener(ps, topic, &Filter{Key: "0"}) go func() { - ps.Pub(&event.Event{Key: "1"}, topic) + ps.Pub(&Event{Key: "1"}, topic) ps.Pub(testEvent, topic) }() go el.Listen() diff --git a/event/publisher/publisher.go b/event/publisher/publisher.go new file mode 100644 index 000000000..e8631a2f0 --- /dev/null +++ b/event/publisher/publisher.go @@ -0,0 +1,56 @@ +package publisher + +import ( + "github.com/cskr/pubsub" + "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/event" + "github.com/mesg-foundation/engine/hash" + "github.com/mesg-foundation/engine/protobuf/types" +) + +const ( + // streamTopic is topic used to broadcast events. + streamTopic = "event-stream" +) + +// EventPublisher exposes event APIs of MESG. +type EventPublisher struct { + ps *pubsub.PubSub + mc *cosmos.ModuleClient +} + +// New creates a new Event SDK with given options. +func New(mc *cosmos.ModuleClient) *EventPublisher { + return &EventPublisher{ + ps: pubsub.New(0), + mc: mc, + } +} + +// Publish a MESG event eventKey with eventData for service token. +func (ep *EventPublisher) Publish(instanceHash hash.Hash, eventKey string, eventData *types.Struct) (*event.Event, error) { + i, err := ep.mc.GetInstance(instanceHash) + if err != nil { + return nil, err + } + + s, err := ep.mc.GetService(i.ServiceHash) + if err != nil { + return nil, err + } + + if err := s.RequireEventData(eventKey, eventData); err != nil { + return nil, err + } + + e := event.New(instanceHash, eventKey, eventData) + go ep.ps.Pub(e, streamTopic) + return e, nil +} + +// GetStream broadcasts all events. +func (ep *EventPublisher) GetStream(f *event.Filter) *event.Listener { + l := event.NewListener(ep.ps, streamTopic, f) + go l.Listen() + return l +} diff --git a/go.mod b/go.mod index e8b95ae85..a5f3b155a 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,8 @@ require ( github.com/tendermint/go-amino v0.15.1 github.com/tendermint/tendermint v0.33.0 github.com/tendermint/tm-db v0.4.0 - github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 // indirect - golang.org/x/tools v0.0.0-20190813142322-97f12d73768f // indirect google.golang.org/grpc v1.27.1 gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/validator.v9 v9.31.0 diff --git a/go.sum b/go.sum index 3368af69a..8596dfa52 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,6 @@ github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8Nz github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cosmos/cosmos-sdk v0.38.0 h1:BrflLMrECI2ZfftRAq2iAlxlyk+W/4iKVCNCC3a+RPc= -github.com/cosmos/cosmos-sdk v0.38.0/go.mod h1:9ZZex0GKpyNCvilvVAPBoB+0n3A/aO1+/UhPVEaiCy4= github.com/cosmos/cosmos-sdk v0.38.1 h1:DTuxIJeMpB//ydq+ObAjQgsaiwYBZ8T7NDzXjyiL1Kg= github.com/cosmos/cosmos-sdk v0.38.1/go.mod h1:9ZZex0GKpyNCvilvVAPBoB+0n3A/aO1+/UhPVEaiCy4= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU= @@ -290,8 +288,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= -github.com/prometheus/client_golang v1.4.0 h1:YVIb/fVcOTMSqtqZWSKnHpSLBxu8DKgxq8z6RuBZwqI= -github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.4.1 h1:FFSuS004yOQEtDdTq+TAOLP5xUq63KqAFYyOi8zA+Y8= github.com/prometheus/client_golang v1.4.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -393,8 +389,6 @@ github.com/tendermint/tm-db v0.4.0/go.mod h1:+Cwhgowrf7NBGXmsqFMbwEtbo80XmyrlY5J github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO8DJg/FJtn/8Fj9NOEpO6+WuMPmk= -github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/zondax/hid v0.9.0 h1:eiT3P6vNxAEVxXMw66eZUAAnU2zD33JBkfG/EnfAKl8= @@ -428,8 +422,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA= @@ -471,8 +463,6 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181112210238-4b1f3b6b1646 h1:JEEoTsNEpPwxsebhPLC6P2jNr+6RFZLY4elUBVcMb+I= -golang.org/x/tools v0.0.0-20181112210238-4b1f3b6b1646/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c h1:vamGzbGri8IKo20MQncCuljcQ5uAO6kaCeawQPVblAI= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -480,9 +470,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190813142322-97f12d73768f h1:nQv5Lx4ucsmk8T4jkEQKJu7YLkYXy/PLoZgTpnIrkuI= -golang.org/x/tools v0.0.0-20190813142322-97f12d73768f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -496,8 +483,6 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/internal/tools/tools.go b/internal/tools/tools.go index c02b0930a..a185d6aeb 100644 --- a/internal/tools/tools.go +++ b/internal/tools/tools.go @@ -8,5 +8,4 @@ import ( _ "github.com/go-bindata/go-bindata/go-bindata" _ "github.com/golang/protobuf/protoc-gen-go" _ "github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc" - _ "github.com/vektra/mockery/cmd/mockery" ) diff --git a/orchestrator/mocks/EventSDK.go b/orchestrator/mocks/EventSDK.go deleted file mode 100644 index 888406ca4..000000000 --- a/orchestrator/mocks/EventSDK.go +++ /dev/null @@ -1,27 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import eventsdk "github.com/mesg-foundation/engine/sdk/event" -import mock "github.com/stretchr/testify/mock" - -// EventSDK is an autogenerated mock type for the EventSDK type -type EventSDK struct { - mock.Mock -} - -// GetStream provides a mock function with given fields: f -func (_m *EventSDK) GetStream(f *eventsdk.Filter) *eventsdk.Listener { - ret := _m.Called(f) - - var r0 *eventsdk.Listener - if rf, ok := ret.Get(0).(func(*eventsdk.Filter) *eventsdk.Listener); ok { - r0 = rf(f) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*eventsdk.Listener) - } - } - - return r0 -} diff --git a/orchestrator/mocks/ExecutionSDK.go b/orchestrator/mocks/ExecutionSDK.go deleted file mode 100644 index 824f9600a..000000000 --- a/orchestrator/mocks/ExecutionSDK.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import api "github.com/mesg-foundation/engine/protobuf/api" -import context "context" -import execution "github.com/mesg-foundation/engine/execution" -import hash "github.com/mesg-foundation/engine/hash" -import mock "github.com/stretchr/testify/mock" - -// ExecutionSDK is an autogenerated mock type for the ExecutionSDK type -type ExecutionSDK struct { - mock.Mock -} - -// Create provides a mock function with given fields: req -func (_m *ExecutionSDK) Create(req *api.CreateExecutionRequest) (*execution.Execution, error) { - ret := _m.Called(req) - - var r0 *execution.Execution - if rf, ok := ret.Get(0).(func(*api.CreateExecutionRequest) *execution.Execution); ok { - r0 = rf(req) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*execution.Execution) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(*api.CreateExecutionRequest) error); ok { - r1 = rf(req) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Get provides a mock function with given fields: _a0 -func (_m *ExecutionSDK) Get(_a0 hash.Hash) (*execution.Execution, error) { - ret := _m.Called(_a0) - - var r0 *execution.Execution - if rf, ok := ret.Get(0).(func(hash.Hash) *execution.Execution); ok { - r0 = rf(_a0) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*execution.Execution) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(hash.Hash) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Stream provides a mock function with given fields: ctx, req -func (_m *ExecutionSDK) Stream(ctx context.Context, req *api.StreamExecutionRequest) (chan *execution.Execution, chan error, error) { - ret := _m.Called(ctx, req) - - var r0 chan *execution.Execution - if rf, ok := ret.Get(0).(func(context.Context, *api.StreamExecutionRequest) chan *execution.Execution); ok { - r0 = rf(ctx, req) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(chan *execution.Execution) - } - } - - var r1 chan error - if rf, ok := ret.Get(1).(func(context.Context, *api.StreamExecutionRequest) chan error); ok { - r1 = rf(ctx, req) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(chan error) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context, *api.StreamExecutionRequest) error); ok { - r2 = rf(ctx, req) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} diff --git a/orchestrator/mocks/ProcessSDK.go b/orchestrator/mocks/ProcessSDK.go deleted file mode 100644 index aa15b5a72..000000000 --- a/orchestrator/mocks/ProcessSDK.go +++ /dev/null @@ -1,35 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -import process "github.com/mesg-foundation/engine/process" - -// ProcessSDK is an autogenerated mock type for the ProcessSDK type -type ProcessSDK struct { - mock.Mock -} - -// List provides a mock function with given fields: -func (_m *ProcessSDK) List() ([]*process.Process, error) { - ret := _m.Called() - - var r0 []*process.Process - if rf, ok := ret.Get(0).(func() []*process.Process); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*process.Process) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} diff --git a/orchestrator/mocks/RunnerSDK.go b/orchestrator/mocks/RunnerSDK.go deleted file mode 100644 index 53d2031f1..000000000 --- a/orchestrator/mocks/RunnerSDK.go +++ /dev/null @@ -1,36 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -import runner "github.com/mesg-foundation/engine/runner" -import runnersdk "github.com/mesg-foundation/engine/sdk/runner" - -// RunnerSDK is an autogenerated mock type for the RunnerSDK type -type RunnerSDK struct { - mock.Mock -} - -// List provides a mock function with given fields: f -func (_m *RunnerSDK) List(f *runnersdk.Filter) ([]*runner.Runner, error) { - ret := _m.Called(f) - - var r0 []*runner.Runner - if rf, ok := ret.Get(0).(func(*runnersdk.Filter) []*runner.Runner); ok { - r0 = rf(f) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*runner.Runner) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(*runnersdk.Filter) error); ok { - r1 = rf(f) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 1fb371514..902fc30c8 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -6,24 +6,23 @@ import ( "fmt" "math/rand" + "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/event" + "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/execution" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/process" "github.com/mesg-foundation/engine/protobuf/api" "github.com/mesg-foundation/engine/protobuf/types" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" "github.com/sirupsen/logrus" ) // New creates a new Process instance -func New(event EventSDK, execution ExecutionSDK, process ProcessSDK, runner RunnerSDK) *Orchestrator { +func New(mc *cosmos.ModuleClient, ep *publisher.EventPublisher) *Orchestrator { return &Orchestrator{ - event: event, - execution: execution, - process: process, - runner: runner, - ErrC: make(chan error), + mc: mc, + ep: ep, + ErrC: make(chan error), } } @@ -33,12 +32,12 @@ func (s *Orchestrator) Start() error { return fmt.Errorf("process orchestrator already running") } - s.eventStream = s.event.GetStream(nil) + s.eventStream = s.ep.GetStream(nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - executionStream, errC, err := s.execution.Stream(ctx, &api.StreamExecutionRequest{ + executionStream, errC, err := s.mc.StreamExecution(ctx, &api.StreamExecutionRequest{ Filter: &api.StreamExecutionRequest_Filter{ Statuses: []execution.Status{execution.Status_Completed}, }, @@ -105,7 +104,7 @@ func (s *Orchestrator) findNodes(wf *process.Process, filter func(wf *process.Pr } func (s *Orchestrator) execute(filter func(wf *process.Process, node *process.Process_Node) (bool, error), exec *execution.Execution, event *event.Event, data *types.Struct) { - processes, err := s.process.List() + processes, err := s.mc.ListProcess() if err != nil { s.ErrC <- err return @@ -231,7 +230,7 @@ func (s *Orchestrator) resolveInput(wfHash hash.Hash, exec *execution.Execution, return nil, fmt.Errorf("reference's nodeKey not found") } if exec.NodeKey != nodeKey { - parent, err := s.execution.Get(exec.ParentHash) + parent, err := s.mc.GetExecution(exec.ParentHash) if err != nil { return nil, err } @@ -248,7 +247,7 @@ func (s *Orchestrator) processTask(nodeKey string, task *process.Process_Node_Ta if exec != nil { execHash = exec.Hash } - executors, err := s.runner.List(&runnersdk.Filter{ + executors, err := s.mc.ListRunner(&cosmos.FilterRunner{ InstanceHash: task.InstanceHash, }) if err != nil { @@ -258,7 +257,7 @@ func (s *Orchestrator) processTask(nodeKey string, task *process.Process_Node_Ta return fmt.Errorf("no runner is running instance %q", task.InstanceHash) } executor := executors[rand.Intn(len(executors))] - _, err = s.execution.Create(&api.CreateExecutionRequest{ + _, err = s.mc.CreateExecution(&api.CreateExecutionRequest{ ProcessHash: wf.Hash, EventHash: eventHash, ParentHash: execHash, diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go deleted file mode 100644 index ac10aecd8..000000000 --- a/orchestrator/orchestrator_test.go +++ /dev/null @@ -1,271 +0,0 @@ -package orchestrator - -import ( - "fmt" - "testing" - - "github.com/mesg-foundation/engine/event" - "github.com/mesg-foundation/engine/execution" - "github.com/mesg-foundation/engine/hash" - "github.com/mesg-foundation/engine/orchestrator/mocks" - "github.com/mesg-foundation/engine/process" - "github.com/mesg-foundation/engine/protobuf/types" - "github.com/mesg-foundation/engine/runner" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func TestFilter(t *testing.T) { - o := New(&mocks.EventSDK{}, &mocks.ExecutionSDK{}, &mocks.ProcessSDK{}, &mocks.RunnerSDK{}) - p := process.Process{ - Hash: hash.Int(1), - Nodes: []*process.Process_Node{ - { - Key: "1", - Type: &process.Process_Node_Event_{ - Event: &process.Process_Node_Event{ - InstanceHash: hash.Int(1), - EventKey: "1", - }, - }, - }, - { - Key: "2", - Type: &process.Process_Node_Task_{Task: &process.Process_Node_Task{ - InstanceHash: hash.Int(2), - TaskKey: "2", - }}}, - { - Key: "3", - Type: &process.Process_Node_Task_{Task: &process.Process_Node_Task{ - InstanceHash: hash.Int(3), - TaskKey: "3", - }}}, - }, - Edges: []*process.Process_Edge{ - {Src: "1", Dst: "2"}, - {Src: "2", Dst: "3"}, - }, - } - var tests = []struct { - filter func(wf *process.Process, node *process.Process_Node) (bool, error) - p *process.Process - n *process.Process_Node - res bool - err error - }{ - { - filter: o.eventFilter(&event.Event{InstanceHash: hash.Int(1), Key: "1"}), - n: p.Nodes[0], - res: true, - err: nil, - }, - { - filter: o.eventFilter(&event.Event{InstanceHash: hash.Int(1), Key: "2"}), - n: p.Nodes[0], - res: false, - err: nil, - }, - { - filter: o.eventFilter(&event.Event{InstanceHash: hash.Int(2), Key: "1"}), - n: p.Nodes[0], - res: false, - err: nil, - }, - { - filter: o.eventFilter(&event.Event{InstanceHash: hash.Int(2), Key: "1"}), - n: p.Nodes[1], - res: false, - err: nil, - }, - { - filter: o.resultFilter(&execution.Execution{InstanceHash: hash.Int(1), TaskKey: "1"}), - n: &process.Process_Node{Type: &process.Process_Node_Result_{Result: &process.Process_Node_Result{ - InstanceHash: hash.Int(1), - TaskKey: "1", - }}}, - res: true, - err: nil, - }, - { - filter: o.resultFilter(&execution.Execution{InstanceHash: hash.Int(1), TaskKey: "1"}), - n: &process.Process_Node{Type: &process.Process_Node_Result_{Result: &process.Process_Node_Result{ - InstanceHash: hash.Int(1), - TaskKey: "2", - }}}, - res: false, - err: nil, - }, - { - filter: o.resultFilter(&execution.Execution{InstanceHash: hash.Int(1), TaskKey: "1"}), - n: &process.Process_Node{Type: &process.Process_Node_Result_{Result: &process.Process_Node_Result{ - InstanceHash: hash.Int(2), - TaskKey: "1", - }}}, - res: false, - err: nil, - }, - { - filter: o.resultFilter(&execution.Execution{InstanceHash: hash.Int(1), TaskKey: "1"}), - n: p.Nodes[0], - res: false, - err: nil, - }, - { - filter: o.dependencyFilter(&execution.Execution{InstanceHash: hash.Int(3), TaskKey: "2", ProcessHash: hash.Int(1), NodeKey: "2"}), - p: &p, - n: p.Nodes[2], - res: true, - err: nil, - }, - { - filter: o.dependencyFilter(&execution.Execution{InstanceHash: hash.Int(3), TaskKey: "2", ProcessHash: hash.Int(2), NodeKey: "2"}), - p: &p, - n: p.Nodes[2], - res: false, - err: nil, - }, - { - filter: o.dependencyFilter(&execution.Execution{InstanceHash: hash.Int(3), TaskKey: "2", ProcessHash: hash.Int(1), NodeKey: "1"}), - p: &p, - n: p.Nodes[2], - res: false, - err: nil, - }, - { - filter: o.dependencyFilter(&execution.Execution{InstanceHash: hash.Int(3), TaskKey: "2", ProcessHash: hash.Int(1), NodeKey: "2"}), - p: &p, - n: p.Nodes[0], - res: false, - err: nil, - }, - } - for _, test := range tests { - ok, err := test.filter(test.p, test.n) - if test.err != nil { - require.Equal(t, test.err, err) - } else { - require.Equal(t, ok, test.res) - } - } -} - -func TestFindNode(t *testing.T) { - o := New(&mocks.EventSDK{}, &mocks.ExecutionSDK{}, &mocks.ProcessSDK{}, &mocks.RunnerSDK{}) - data := &process.Process{ - Hash: hash.Int(1), - Nodes: []*process.Process_Node{ - { - Key: "1", - Type: &process.Process_Node_Event_{ - Event: &process.Process_Node_Event{}, - }, - }, - }, - } - require.Len(t, o.findNodes(data, func(p *process.Process, n *process.Process_Node) (bool, error) { - return true, nil - }), 1) - require.Len(t, o.findNodes(data, func(p *process.Process, n *process.Process_Node) (bool, error) { - return n.Key == "1", nil - }), 1) - require.Len(t, o.findNodes(data, func(p *process.Process, n *process.Process_Node) (bool, error) { - return n.Key == "2", nil - }), 0) -} - -// func TestProcessMap(t *testing.T) { -// e := &mocks.ExecutionSDK{} -// o := New(&mocks.EventSDK{}, e, &mocks.ProcessSDK{}) -// exec := &execution.Execution{ -// ProcessHash: hash.Int(1), -// NodeKey: "1", -// ParentHash: hash.Int(2), -// Outputs: &types.Struct{ -// Fields: map[string]*types.Value{ -// "xxx": &types.Value{ -// Kind: &types.Value_StringValue{StringValue: "str"}, -// }, -// }, -// }, -// } -// o.processMap(&process.Process_Node_Map{ -// Outputs: []*process.Process_Node_Map_Output{}, -// }, ) -// } - -func TestResolveInput(t *testing.T) { - e := &mocks.ExecutionSDK{} - o := New(&mocks.EventSDK{}, e, &mocks.ProcessSDK{}, &mocks.RunnerSDK{}) - exec := &execution.Execution{ - ProcessHash: hash.Int(2), - NodeKey: "2", - ParentHash: hash.Int(3), - Outputs: &types.Struct{ - Fields: map[string]*types.Value{ - "xxx": { - Kind: &types.Value_StringValue{StringValue: "str"}, - }, - }, - }, - } - // Different processes - _, err := o.resolveInput(hash.Int(1), exec, "2", &process.Process_Node_Map_Output_Reference_Path{Selector: &process.Process_Node_Map_Output_Reference_Path_Key{Key: "xxx"}}) - require.Error(t, err) - // Different steps, should return the value of the data - val, err := o.resolveInput(hash.Int(2), exec, "2", &process.Process_Node_Map_Output_Reference_Path{Selector: &process.Process_Node_Map_Output_Reference_Path_Key{Key: "xxx"}}) - require.NoError(t, err) - require.Equal(t, val, exec.Outputs.Fields["xxx"]) - // Invalid execution parent hash - e.On("Get", mock.Anything).Once().Return(nil, fmt.Errorf("err")) - _, err = o.resolveInput(hash.Int(2), exec, "-", &process.Process_Node_Map_Output_Reference_Path{Selector: &process.Process_Node_Map_Output_Reference_Path_Key{Key: "xxx"}}) - require.Error(t, err) - // Output from a previous exec - execMock := &execution.Execution{ - NodeKey: "3", - ProcessHash: hash.Int(2), - Outputs: &types.Struct{ - Fields: map[string]*types.Value{ - "yyy": { - Kind: &types.Value_StringValue{StringValue: "str2"}, - }, - }, - }, - } - e.On("Get", mock.Anything).Once().Return(execMock, nil) - val, err = o.resolveInput(hash.Int(2), exec, "3", &process.Process_Node_Map_Output_Reference_Path{Selector: &process.Process_Node_Map_Output_Reference_Path_Key{Key: "yyy"}}) - require.NoError(t, err) - require.Equal(t, val, execMock.Outputs.Fields["yyy"]) -} - -func TestProcessTask(t *testing.T) { - e := &mocks.ExecutionSDK{} - e.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once().Return(nil, nil) - r := &mocks.RunnerSDK{} - r.On("List", mock.Anything).Once().Return([]*runner.Runner{{Hash: hash.Int(1)}}, nil) - o := New(&mocks.EventSDK{}, e, &mocks.ProcessSDK{}, r) - err := o.processTask("-", &process.Process_Node_Task{ - InstanceHash: hash.Int(1), - TaskKey: "-", - }, &process.Process{ - Hash: hash.Int(2), - }, &execution.Execution{ - Hash: hash.Int(3), - }, nil, &types.Struct{ - Fields: map[string]*types.Value{}, - }) - require.NoError(t, err) - e.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once().Return(nil, fmt.Errorf("error")) - r.On("List", mock.Anything).Once().Return([]*runner.Runner{{Hash: hash.Int(1)}}, nil) - err = o.processTask("-", &process.Process_Node_Task{ - InstanceHash: hash.Int(1), - TaskKey: "-", - }, &process.Process{ - Hash: hash.Int(2), - }, nil, &event.Event{ - Hash: hash.Int(3), - }, &types.Struct{ - Fields: map[string]*types.Value{}, - }) - require.Error(t, err) -} diff --git a/orchestrator/type.go b/orchestrator/type.go index edfa8fe34..cf19adc26 100644 --- a/orchestrator/type.go +++ b/orchestrator/type.go @@ -1,50 +1,19 @@ package orchestrator import ( - "context" - + "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/event" + "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/execution" - "github.com/mesg-foundation/engine/hash" - "github.com/mesg-foundation/engine/process" - "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/runner" - eventsdk "github.com/mesg-foundation/engine/sdk/event" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" ) -// ExecutionSDK execution interface needed for the orchestrator -type ExecutionSDK interface { - Stream(ctx context.Context, req *api.StreamExecutionRequest) (chan *execution.Execution, chan error, error) - Get(hash hash.Hash) (*execution.Execution, error) - Create(req *api.CreateExecutionRequest) (*execution.Execution, error) -} - -// EventSDK event interface needed for the orchestrator -type EventSDK interface { - GetStream(f *eventsdk.Filter) *eventsdk.Listener -} - -// ProcessSDK process interface needed for the orchestrator -type ProcessSDK interface { - List() ([]*process.Process, error) -} - -// RunnerSDK is the interface of the runner sdk needed for the orchestrator -type RunnerSDK interface { - List(f *runnersdk.Filter) ([]*runner.Runner, error) -} - // Orchestrator manages the executions based on the definition of the processes type Orchestrator struct { - event EventSDK - eventStream *eventsdk.Listener + mc *cosmos.ModuleClient + ep *publisher.EventPublisher - execution ExecutionSDK + eventStream *event.Listener executionStream <-chan *execution.Execution - process ProcessSDK - - runner RunnerSDK - ErrC chan error } diff --git a/runner/builder/builder.go b/runner/builder/builder.go new file mode 100644 index 000000000..b5022f147 --- /dev/null +++ b/runner/builder/builder.go @@ -0,0 +1,124 @@ +package builder + +import ( + "errors" + "fmt" + + "github.com/mesg-foundation/engine/container" + "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/ext/xos" + "github.com/mesg-foundation/engine/hash" + instancepb "github.com/mesg-foundation/engine/instance" + "github.com/mesg-foundation/engine/protobuf/api" + runnerpb "github.com/mesg-foundation/engine/runner" +) + +// Builder is the runner builder. +type Builder struct { + mc *cosmos.ModuleClient + container container.Container + engineName string + port string + ipfsEndpoint string +} + +// New returns the runner sdk. +func New(mc *cosmos.ModuleClient, container container.Container, engineName, port, ipfsEndpoint string) *Builder { + sdk := &Builder{ + container: container, + mc: mc, + engineName: engineName, + port: port, + ipfsEndpoint: ipfsEndpoint, + } + return sdk +} + +// Create creates a new runner. +func (b *Builder) Create(req *api.CreateRunnerRequest) (*runnerpb.Runner, error) { + // calculate instance's hash. + // TODO: this should be merged with the same logic currently in instance sdk + srv, err := b.mc.GetService(req.ServiceHash) + if err != nil { + return nil, err + } + + instanceEnv := xos.EnvMergeSlices(srv.Configuration.Env, req.Env) + envHash := hash.Dump(instanceEnv) + // TODO: should be done by instance or runner + instanceHash := hash.Dump(&instancepb.Instance{ + ServiceHash: srv.Hash, + EnvHash: envHash, + }) + acc, err := b.mc.GetAccount() + if err != nil { + return nil, err + } + expRunnerHash := hash.Dump(&runnerpb.Runner{ + Address: acc.GetAddress().String(), + InstanceHash: instanceHash, + }) + + if runExisting, _ := b.mc.GetRunner(expRunnerHash); runExisting != nil { + return nil, fmt.Errorf("runner %q already exists", runExisting.Hash) + } + + // start the container + imageHash, err := build(b.container, srv, b.ipfsEndpoint) + if err != nil { + return nil, err + } + _, err = start(b.container, srv, instanceHash, expRunnerHash, imageHash, instanceEnv, b.engineName, b.port) + if err != nil { + return nil, err + } + + run, err := b.mc.CreateRunner(req) + if err != nil { + stop(b.container, expRunnerHash, srv.Dependencies) + return nil, err + } + + if !run.Hash.Equal(expRunnerHash) { + stop(b.container, expRunnerHash, srv.Dependencies) + return nil, errors.New("calculated runner hash is not the same") + } + return run, nil +} + +// Delete deletes an existing runner. +func (b *Builder) Delete(req *api.DeleteRunnerRequest) error { + // get runner before deleting it + r, err := b.mc.GetRunner(req.Hash) + if err != nil { + return err + } + + if err := b.mc.DeleteRunner(req); err != nil { + return err + } + + inst, err := b.mc.GetInstance(r.InstanceHash) + if err != nil { + return err + } + + srv, err := b.mc.GetService(inst.ServiceHash) + if err != nil { + return err + } + + // stop the local running service + if err := stop(b.container, r.Hash, srv.Dependencies); err != nil { + return err + } + + // remove local volume + if req.DeleteData { + if err := deleteData(b.container, srv); err != nil { + return err + } + } + + return nil +} diff --git a/sdk/runner/container.go b/runner/builder/container.go similarity index 99% rename from sdk/runner/container.go rename to runner/builder/container.go index 2fa2a5f79..459339a24 100644 --- a/sdk/runner/container.go +++ b/runner/builder/container.go @@ -1,4 +1,4 @@ -package runnersdk +package builder import ( "errors" diff --git a/scripts/build-mocks.sh b/scripts/build-mocks.sh deleted file mode 100755 index 5dbd5bed8..000000000 --- a/scripts/build-mocks.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -e - -# generate mocks -mockery -name ExecutionSDK -dir ./orchestrator -output ./orchestrator/mocks -mockery -name EventSDK -dir ./orchestrator -output ./orchestrator/mocks -mockery -name ProcessSDK -dir ./orchestrator -output ./orchestrator/mocks -mockery -name RunnerSDK -dir ./orchestrator -output ./orchestrator/mocks diff --git a/sdk/event/event.go b/sdk/event/event.go deleted file mode 100644 index 17862fad3..000000000 --- a/sdk/event/event.go +++ /dev/null @@ -1,64 +0,0 @@ -package eventsdk - -import ( - "fmt" - - "github.com/cskr/pubsub" - "github.com/mesg-foundation/engine/cosmos" - "github.com/mesg-foundation/engine/event" - "github.com/mesg-foundation/engine/hash" - instancepb "github.com/mesg-foundation/engine/instance" - "github.com/mesg-foundation/engine/protobuf/types" - servicesdk "github.com/mesg-foundation/engine/sdk/service" - "github.com/mesg-foundation/engine/x/instance" -) - -const ( - // streamTopic is topic used to broadcast events. - streamTopic = "event-stream" -) - -// Event exposes event APIs of MESG. -type Event struct { - ps *pubsub.PubSub - client *cosmos.Client - service *servicesdk.SDK -} - -// New creates a new Event SDK with given options. -func New(ps *pubsub.PubSub, service *servicesdk.SDK, client *cosmos.Client) *Event { - return &Event{ - ps: ps, - client: client, - service: service, - } -} - -// Create a MESG event eventKey with eventData for service token. -func (e *Event) Create(instanceHash hash.Hash, eventKey string, eventData *types.Struct) (*event.Event, error) { - event := event.Create(instanceHash, eventKey, eventData) - - var inst instancepb.Instance - if err := e.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", instance.QuerierRoute, instance.QueryGetInstance, event.InstanceHash), nil, &inst); err != nil { - return nil, err - } - - service, err := e.service.Get(inst.ServiceHash) - if err != nil { - return nil, err - } - - if err := service.RequireEventData(event.Key, event.Data); err != nil { - return nil, err - } - - go e.ps.Pub(event, streamTopic) - return event, nil -} - -// GetStream broadcasts all events. -func (e *Event) GetStream(f *Filter) *Listener { - l := NewListener(e.ps, streamTopic, f) - go l.Listen() - return l -} diff --git a/sdk/execution/sdk.go b/sdk/execution/sdk.go deleted file mode 100644 index 56271a9fb..000000000 --- a/sdk/execution/sdk.go +++ /dev/null @@ -1,118 +0,0 @@ -package executionsdk - -import ( - "context" - "fmt" - - "github.com/mesg-foundation/engine/cosmos" - executionpb "github.com/mesg-foundation/engine/execution" - "github.com/mesg-foundation/engine/hash" - "github.com/mesg-foundation/engine/protobuf/api" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" - servicesdk "github.com/mesg-foundation/engine/sdk/service" - "github.com/mesg-foundation/engine/x/execution" -) - -// SDK is the execution sdk. -type SDK struct { - client *cosmos.Client - - serviceSDK *servicesdk.SDK - runnerSDK *runnersdk.SDK -} - -// New returns the execution sdk. -func New(client *cosmos.Client, serviceSDK *servicesdk.SDK, runnerSDK *runnersdk.SDK) *SDK { - sdk := &SDK{ - client: client, - serviceSDK: serviceSDK, - runnerSDK: runnerSDK, - } - return sdk -} - -// Create creates a new execution. -func (s *SDK) Create(req *api.CreateExecutionRequest) (*executionpb.Execution, error) { - execution.M.Created.Add(1) - acc, err := s.client.GetAccount() - if err != nil { - return nil, err - } - msg := execution.NewMsgCreateExecution(req, acc.GetAddress()) - tx, err := s.client.BuildAndBroadcastMsg(msg) - if err != nil { - return nil, err - } - return s.Get(tx.Data) -} - -// Update updates a execution. -func (s *SDK) Update(req *api.UpdateExecutionRequest) (*executionpb.Execution, error) { - execution.M.Updated.Add(1) - acc, err := s.client.GetAccount() - if err != nil { - return nil, err - } - msg := execution.NewMsgUpdateExecution(req, acc.GetAddress()) - tx, err := s.client.BuildAndBroadcastMsg(msg) - if err != nil { - return nil, err - } - return s.Get(tx.Data) -} - -// Get returns the execution that matches given hash. -func (s *SDK) Get(hash hash.Hash) (*executionpb.Execution, error) { - var e executionpb.Execution - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", execution.QuerierRoute, execution.QueryGetExecution, hash.String()), nil, &e); err != nil { - return nil, err - } - return &e, nil -} - -// List returns all executions. -func (s *SDK) List() ([]*executionpb.Execution, error) { - var es []*executionpb.Execution - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", execution.QuerierRoute, execution.QueryListExecution), nil, &es); err != nil { - return nil, err - } - return es, nil -} - -// Stream returns execution that matches given hash. -func (s *SDK) Stream(ctx context.Context, req *api.StreamExecutionRequest) (chan *executionpb.Execution, chan error, error) { - if err := req.Filter.Validate(); err != nil { - return nil, nil, err - } - - stream, serrC, err := s.client.Stream(ctx, cosmos.EventModuleQuery(execution.ModuleName)) - if err != nil { - return nil, nil, err - } - - execC := make(chan *executionpb.Execution) - errC := make(chan error) - go func() { - loop: - for { - select { - case hash := <-stream: - exec, err := s.Get(hash) - if err != nil { - errC <- err - break - } - if req.Filter.Match(exec) { - execC <- exec - } - case err := <-serrC: - errC <- err - case <-ctx.Done(): - break loop - } - } - close(errC) - close(execC) - }() - return execC, errC, nil -} diff --git a/sdk/process/sdk.go b/sdk/process/sdk.go deleted file mode 100644 index 35a110f33..000000000 --- a/sdk/process/sdk.go +++ /dev/null @@ -1,66 +0,0 @@ -package processsdk - -import ( - "fmt" - - "github.com/mesg-foundation/engine/cosmos" - "github.com/mesg-foundation/engine/hash" - processpb "github.com/mesg-foundation/engine/process" - "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/x/process" -) - -// SDK is the process sdk. -type SDK struct { - client *cosmos.Client -} - -// New creates a new Process SDK with given options. -func New(client *cosmos.Client) *SDK { - return &SDK{ - client: client, - } -} - -// Create creates a new process. -func (s *SDK) Create(req *api.CreateProcessRequest) (*processpb.Process, error) { - acc, err := s.client.GetAccount() - if err != nil { - return nil, err - } - msg := process.NewMsgCreateProcess(acc.GetAddress(), req) - tx, err := s.client.BuildAndBroadcastMsg(msg) - if err != nil { - return nil, err - } - return s.Get(tx.Data) -} - -// Delete deletes the process by hash. -func (s *SDK) Delete(req *api.DeleteProcessRequest) error { - acc, err := s.client.GetAccount() - if err != nil { - return err - } - msg := process.NewMsgDeleteProcess(acc.GetAddress(), req) - _, err = s.client.BuildAndBroadcastMsg(msg) - return err -} - -// Get returns the process that matches given hash. -func (s *SDK) Get(hash hash.Hash) (*processpb.Process, error) { - var p processpb.Process - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", process.QuerierRoute, process.QueryGetProcess, hash.String()), nil, &p); err != nil { - return nil, err - } - return &p, nil -} - -// List returns all processes. -func (s *SDK) List() ([]*processpb.Process, error) { - var processes []*processpb.Process - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", process.QuerierRoute, process.QueryListProcesses), nil, &processes); err != nil { - return nil, err - } - return processes, nil -} diff --git a/sdk/runner/sdk.go b/sdk/runner/sdk.go deleted file mode 100644 index 2dc897fd0..000000000 --- a/sdk/runner/sdk.go +++ /dev/null @@ -1,177 +0,0 @@ -package runnersdk - -import ( - "errors" - "fmt" - - "github.com/mesg-foundation/engine/container" - "github.com/mesg-foundation/engine/cosmos" - "github.com/mesg-foundation/engine/ext/xos" - "github.com/mesg-foundation/engine/hash" - instancepb "github.com/mesg-foundation/engine/instance" - "github.com/mesg-foundation/engine/protobuf/api" - runnerpb "github.com/mesg-foundation/engine/runner" - servicesdk "github.com/mesg-foundation/engine/sdk/service" - "github.com/mesg-foundation/engine/x/instance" - "github.com/mesg-foundation/engine/x/runner" -) - -// SDK is the runner sdk. -type SDK struct { - serviceSDK *servicesdk.SDK - client *cosmos.Client - container container.Container - port string - engineName string - ipfsEndpoint string -} - -// Filter to apply while listing runners. -type Filter struct { - Address string - InstanceHash hash.Hash -} - -// New returns the runner sdk. -func New(client *cosmos.Client, serviceSDK *servicesdk.SDK, container container.Container, engineName, port, ipfsEndpoint string) *SDK { - sdk := &SDK{ - container: container, - serviceSDK: serviceSDK, - client: client, - port: port, - engineName: engineName, - ipfsEndpoint: ipfsEndpoint, - } - return sdk -} - -// Create creates a new runner. -func (s *SDK) Create(req *api.CreateRunnerRequest) (*runnerpb.Runner, error) { - // calculate instance's hash. - // TODO: this should be merged with the same logic currently in instance sdk - srv, err := s.serviceSDK.Get(req.ServiceHash) - if err != nil { - return nil, err - } - instanceEnv := xos.EnvMergeSlices(srv.Configuration.Env, req.Env) - envHash := hash.Dump(instanceEnv) - // TODO: should be done by instance or runner - instanceHash := hash.Dump(&instancepb.Instance{ - ServiceHash: srv.Hash, - EnvHash: envHash, - }) - acc, err := s.client.GetAccount() - if err != nil { - return nil, err - } - expRunnerHash := hash.Dump(&runnerpb.Runner{ - Address: acc.GetAddress().String(), - InstanceHash: instanceHash, - }) - - if runExisting, _ := s.Get(expRunnerHash); runExisting != nil { - return nil, fmt.Errorf("runner %q already exists", runExisting.Hash) - } - - // start the container - imageHash, err := build(s.container, srv, s.ipfsEndpoint) - if err != nil { - return nil, err - } - _, err = start(s.container, srv, instanceHash, expRunnerHash, imageHash, instanceEnv, s.engineName, s.port) - if err != nil { - return nil, err - } - onError := func() { - stop(s.container, expRunnerHash, srv.Dependencies) - } - - msg := runner.NewMsgCreateRunner(acc.GetAddress(), req.ServiceHash, envHash) - tx, err := s.client.BuildAndBroadcastMsg(msg) - if err != nil { - onError() - return nil, err - } - run, err := s.Get(tx.Data) - if err != nil { - onError() - return nil, err - } - if !run.Hash.Equal(expRunnerHash) { - onError() - return nil, errors.New("calculated runner hash is not the same") - } - return run, nil -} - -// Delete deletes an existing runner. -func (s *SDK) Delete(req *api.DeleteRunnerRequest) error { - // get runner before deleting it - r, err := s.Get(req.Hash) - if err != nil { - return err - } - acc, err := s.client.GetAccount() - if err != nil { - return err - } - msg := runner.NewMsgDeleteRunner(acc.GetAddress(), req.Hash) - _, err = s.client.BuildAndBroadcastMsg(msg) - if err != nil { - return err - } - - var inst instancepb.Instance - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", instance.QuerierRoute, instance.QueryGetInstance, r.InstanceHash), nil, &inst); err != nil { - return err - } - - srv, err := s.serviceSDK.Get(inst.ServiceHash) - if err != nil { - return err - } - - // stop the local running service - if err := stop(s.container, r.Hash, srv.Dependencies); err != nil { - return err - } - - // remove local volume - if req.DeleteData { - if err := deleteData(s.container, srv); err != nil { - return err - } - } - - return nil -} - -// Get returns the runner that matches given hash. -func (s *SDK) Get(hash hash.Hash) (*runnerpb.Runner, error) { - var r runnerpb.Runner - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", runner.QuerierRoute, runner.QueryGetRunner, hash), nil, &r); err != nil { - return nil, err - } - return &r, nil -} - -// List returns all runners. -func (s *SDK) List(f *Filter) ([]*runnerpb.Runner, error) { - var runners []*runnerpb.Runner - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", runner.QuerierRoute, runner.QueryListRunners), nil, &runners); err != nil { - return nil, err - } - // no filter, returns - if f == nil { - return runners, nil - } - // filter results - ret := make([]*runnerpb.Runner, 0) - for _, r := range runners { - if (f.Address == "" || r.Address == f.Address) && - (f.InstanceHash.IsZero() || r.InstanceHash.Equal(f.InstanceHash)) { - ret = append(ret, r) - } - } - return ret, nil -} diff --git a/sdk/sdk.go b/sdk/sdk.go deleted file mode 100644 index 5ddbdbd4d..000000000 --- a/sdk/sdk.go +++ /dev/null @@ -1,38 +0,0 @@ -package sdk - -import ( - "github.com/cskr/pubsub" - "github.com/mesg-foundation/engine/container" - "github.com/mesg-foundation/engine/cosmos" - eventsdk "github.com/mesg-foundation/engine/sdk/event" - executionsdk "github.com/mesg-foundation/engine/sdk/execution" - processesdk "github.com/mesg-foundation/engine/sdk/process" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" - servicesdk "github.com/mesg-foundation/engine/sdk/service" -) - -// SDK exposes all functionalities of MESG Engine. -type SDK struct { - Service *servicesdk.SDK - Execution *executionsdk.SDK - Event *eventsdk.Event - Process *processesdk.SDK - Runner *runnersdk.SDK -} - -// New creates a new SDK with given options. -func New(client *cosmos.Client, kb *cosmos.Keybase, container container.Container, engineName, port, ipfsEndpoint string) *SDK { - ps := pubsub.New(0) - serviceSDK := servicesdk.New(client) - runnerSDK := runnersdk.New(client, serviceSDK, container, engineName, port, ipfsEndpoint) - processSDK := processesdk.New(client) - executionSDK := executionsdk.New(client, serviceSDK, runnerSDK) - eventSDK := eventsdk.New(ps, serviceSDK, client) - return &SDK{ - Service: serviceSDK, - Execution: executionSDK, - Event: eventSDK, - Process: processSDK, - Runner: runnerSDK, - } -} diff --git a/sdk/service/sdk.go b/sdk/service/sdk.go deleted file mode 100644 index 8e2ac18c7..000000000 --- a/sdk/service/sdk.go +++ /dev/null @@ -1,71 +0,0 @@ -package servicesdk - -import ( - "fmt" - - "github.com/mesg-foundation/engine/cosmos" - "github.com/mesg-foundation/engine/hash" - "github.com/mesg-foundation/engine/protobuf/api" - servicepb "github.com/mesg-foundation/engine/service" - "github.com/mesg-foundation/engine/x/service" -) - -// SDK is the service sdk. -type SDK struct { - client *cosmos.Client -} - -// New returns the service sdk. -func New(client *cosmos.Client) *SDK { - return &SDK{client: client} -} - -// Create creates a new service from definition. -func (s *SDK) Create(req *api.CreateServiceRequest) (*servicepb.Service, error) { - acc, err := s.client.GetAccount() - if err != nil { - return nil, err - } - msg := service.NewMsgCreateService(acc.GetAddress(), req) - tx, err := s.client.BuildAndBroadcastMsg(msg) - if err != nil { - return nil, err - } - return s.Get(tx.Data) -} - -// Get returns the service that matches given hash. -func (s *SDK) Get(hash hash.Hash) (*servicepb.Service, error) { - var se servicepb.Service - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", service.QuerierRoute, service.QueryGetService, hash), nil, &se); err != nil { - return nil, err - } - return &se, nil -} - -// List returns all services. -func (s *SDK) List() ([]*servicepb.Service, error) { - var services []*servicepb.Service - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", service.QuerierRoute, service.QueryListService), nil, &services); err != nil { - return nil, err - } - return services, nil -} - -// Exists returns if a service already exists. -func (s *SDK) Exists(hash hash.Hash) (bool, error) { - var exists bool - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s/%s", service.QuerierRoute, service.QueryExistService, hash), nil, &exists); err != nil { - return false, err - } - return exists, nil -} - -// Hash returns the calculate hash of a service. -func (s *SDK) Hash(req *api.CreateServiceRequest) (hash.Hash, error) { - var h hash.Hash - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", service.QuerierRoute, service.QueryHashService), req, &h); err != nil { - return nil, err - } - return h, nil -} diff --git a/server/grpc/api/event.go b/server/grpc/api/event.go index fe9629a72..6c918fb32 100644 --- a/server/grpc/api/event.go +++ b/server/grpc/api/event.go @@ -5,20 +5,20 @@ import ( "errors" "fmt" + "github.com/mesg-foundation/engine/event" + "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/protobuf/acknowledgement" "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" - eventsdk "github.com/mesg-foundation/engine/sdk/event" ) // EventServer serve event functions. type EventServer struct { - sdk *sdk.SDK + ep *publisher.EventPublisher } // NewEventServer creates a new EventServer. -func NewEventServer(sdk *sdk.SDK) *EventServer { - return &EventServer{sdk: sdk} +func NewEventServer(ep *publisher.EventPublisher) *EventServer { + return &EventServer{ep: ep} } // Create creates a new event. @@ -27,7 +27,7 @@ func (s *EventServer) Create(ctx context.Context, req *api.CreateEventRequest) ( return nil, errors.New("create event: key missing") } - event, err := s.sdk.Event.Create(req.InstanceHash, req.Key, req.Data) + event, err := s.ep.Publish(req.InstanceHash, req.Key, req.Data) if err != nil { return nil, fmt.Errorf("create event: data %s", err) } @@ -37,15 +37,15 @@ func (s *EventServer) Create(ctx context.Context, req *api.CreateEventRequest) ( // Stream returns stream of events. func (s *EventServer) Stream(req *api.StreamEventRequest, resp api.Event_StreamServer) error { - var f *eventsdk.Filter + var f *event.Filter if req.Filter != nil { - f = &eventsdk.Filter{ + f = &event.Filter{ Hash: req.Filter.Hash, InstanceHash: req.Filter.InstanceHash, Key: req.Filter.Key, } } - stream := s.sdk.Event.GetStream(f) + stream := s.ep.GetStream(f) defer stream.Close() // send header to notify client that the stream is ready. diff --git a/server/grpc/api/execution.go b/server/grpc/api/execution.go index 2ba492db9..438d3a8dd 100644 --- a/server/grpc/api/execution.go +++ b/server/grpc/api/execution.go @@ -3,37 +3,34 @@ package api import ( "context" + "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/execution" "github.com/mesg-foundation/engine/protobuf/acknowledgement" "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" ) // ExecutionServer serve execution functions. type ExecutionServer struct { - sdk *sdk.SDK + mc *cosmos.ModuleClient } // NewExecutionServer creates a new ExecutionServer. -func NewExecutionServer(sdk *sdk.SDK) *ExecutionServer { - return &ExecutionServer{sdk: sdk} +func NewExecutionServer(mc *cosmos.ModuleClient) *ExecutionServer { + return &ExecutionServer{mc: mc} } // Create creates an execution. func (s *ExecutionServer) Create(ctx context.Context, req *api.CreateExecutionRequest) (*api.CreateExecutionResponse, error) { - exec, err := s.sdk.Execution.Create(req) + exec, err := s.mc.CreateExecution(req) if err != nil { return nil, err } - - return &api.CreateExecutionResponse{ - Hash: exec.Hash, - }, nil + return &api.CreateExecutionResponse{Hash: exec.Hash}, nil } // Get returns execution from given hash. func (s *ExecutionServer) Get(ctx context.Context, req *api.GetExecutionRequest) (*execution.Execution, error) { - return s.sdk.Execution.Get(req.Hash) + return s.mc.GetExecution(req.Hash) } // Stream returns stream of executions. @@ -41,7 +38,7 @@ func (s *ExecutionServer) Stream(req *api.StreamExecutionRequest, resp api.Execu ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stream, errC, err := s.sdk.Execution.Stream(ctx, req) + stream, errC, err := s.mc.StreamExecution(ctx, req) if err != nil { return err } @@ -66,7 +63,7 @@ func (s *ExecutionServer) Stream(req *api.StreamExecutionRequest, resp api.Execu // Update updates execution from given hash. func (s *ExecutionServer) Update(ctx context.Context, req *api.UpdateExecutionRequest) (*api.UpdateExecutionResponse, error) { - if _, err := s.sdk.Execution.Update(req); err != nil { + if _, err := s.mc.UpdateExecution(req); err != nil { return nil, err } return &api.UpdateExecutionResponse{}, nil @@ -74,7 +71,7 @@ func (s *ExecutionServer) Update(ctx context.Context, req *api.UpdateExecutionRe // List returns all executions. func (s *ExecutionServer) List(ctx context.Context, req *api.ListExecutionRequest) (*api.ListExecutionResponse, error) { - executions, err := s.sdk.Execution.List() + executions, err := s.mc.ListExecution() if err != nil { return nil, err } diff --git a/server/grpc/api/instance.go b/server/grpc/api/instance.go index 645e6d6a5..14146239d 100644 --- a/server/grpc/api/instance.go +++ b/server/grpc/api/instance.go @@ -2,46 +2,32 @@ package api import ( "context" - "fmt" "github.com/mesg-foundation/engine/cosmos" instancepb "github.com/mesg-foundation/engine/instance" protobuf_api "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" - "github.com/mesg-foundation/engine/x/instance" ) // InstanceServer is the type to aggregate all Instance APIs. type InstanceServer struct { - sdk *sdk.SDK - client *cosmos.Client + mc *cosmos.ModuleClient } // NewInstanceServer creates a new ServiceServer. -func NewInstanceServer(sdk *sdk.SDK, client *cosmos.Client) *InstanceServer { - return &InstanceServer{ - sdk: sdk, - client: client, - } +func NewInstanceServer(mc *cosmos.ModuleClient) *InstanceServer { + return &InstanceServer{mc: mc} } // List instances. -func (s *InstanceServer) List(ctx context.Context, request *protobuf_api.ListInstanceRequest) (*protobuf_api.ListInstanceResponse, error) { - var instances []*instancepb.Instance - if err := s.client.QueryJSON(fmt.Sprintf("custom/%s/%s", instance.QuerierRoute, instance.QueryListInstances), request.Filter, &instances); err != nil { +func (s *InstanceServer) List(ctx context.Context, req *protobuf_api.ListInstanceRequest) (*protobuf_api.ListInstanceResponse, error) { + out, err := s.mc.ListInstance(req) + if err != nil { return nil, err } - return &protobuf_api.ListInstanceResponse{Instances: instances}, nil + return &protobuf_api.ListInstanceResponse{Instances: out}, nil } // Get retrives instance. -func (s *InstanceServer) Get(ctx context.Context, request *protobuf_api.GetInstanceRequest) (*instancepb.Instance, error) { - var inst instancepb.Instance - err := s.client.QueryJSON( - fmt.Sprintf("custom/%s/%s/%s", instance.QuerierRoute, instance.QueryGetInstance, request.Hash), - nil, &inst) - if err != nil { - return nil, err - } - return &inst, nil +func (s *InstanceServer) Get(ctx context.Context, req *protobuf_api.GetInstanceRequest) (*instancepb.Instance, error) { + return s.mc.GetInstance(req.Hash) } diff --git a/server/grpc/api/ownership.go b/server/grpc/api/ownership.go index be338c5d6..9a0803de0 100644 --- a/server/grpc/api/ownership.go +++ b/server/grpc/api/ownership.go @@ -4,31 +4,24 @@ import ( "context" "github.com/mesg-foundation/engine/cosmos" - ownershippb "github.com/mesg-foundation/engine/ownership" - protobuf_api "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" - "github.com/mesg-foundation/engine/x/ownership" + "github.com/mesg-foundation/engine/protobuf/api" ) // OwnershipServer is the type to aggregate all Ownership APIs. type OwnershipServer struct { - sdk *sdk.SDK - client *cosmos.Client + mc *cosmos.ModuleClient } // NewOwnershipServer creates a new OwnershipServer. -func NewOwnershipServer(sdk *sdk.SDK, client *cosmos.Client) *OwnershipServer { - return &OwnershipServer{ - sdk: sdk, - client: client, - } +func NewOwnershipServer(mc *cosmos.ModuleClient) *OwnershipServer { + return &OwnershipServer{mc: mc} } // List returns all ownerships. -func (s *OwnershipServer) List(ctx context.Context, req *protobuf_api.ListOwnershipRequest) (*protobuf_api.ListOwnershipResponse, error) { - var ownerships []*ownershippb.Ownership - if err := s.client.QueryJSON("custom/"+ownership.QuerierRoute+"/"+ownership.QueryListOwnerships, nil, &ownerships); err != nil { +func (s *OwnershipServer) List(ctx context.Context, req *api.ListOwnershipRequest) (*api.ListOwnershipResponse, error) { + out, err := s.mc.ListOwnership() + if err != nil { return nil, err } - return &protobuf_api.ListOwnershipResponse{Ownerships: ownerships}, nil + return &api.ListOwnershipResponse{Ownerships: out}, nil } diff --git a/server/grpc/api/process.go b/server/grpc/api/process.go index 6a8598369..6434b39c0 100644 --- a/server/grpc/api/process.go +++ b/server/grpc/api/process.go @@ -3,24 +3,24 @@ package api import ( "context" + "github.com/mesg-foundation/engine/cosmos" "github.com/mesg-foundation/engine/process" "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" ) // ProcessServer is the type to aggregate all Service APIs. type ProcessServer struct { - sdk *sdk.SDK + mc *cosmos.ModuleClient } // NewProcessServer creates a new ProcessServer. -func NewProcessServer(sdk *sdk.SDK) *ProcessServer { - return &ProcessServer{sdk: sdk} +func NewProcessServer(mc *cosmos.ModuleClient) *ProcessServer { + return &ProcessServer{mc: mc} } // Create creates a new process. func (s *ProcessServer) Create(ctx context.Context, req *api.CreateProcessRequest) (*api.CreateProcessResponse, error) { - wf, err := s.sdk.Process.Create(req) + wf, err := s.mc.CreateProcess(req) if err != nil { return nil, err } @@ -29,21 +29,19 @@ func (s *ProcessServer) Create(ctx context.Context, req *api.CreateProcessReques // Delete deletes process by hash or sid. func (s *ProcessServer) Delete(ctx context.Context, req *api.DeleteProcessRequest) (*api.DeleteProcessResponse, error) { - return &api.DeleteProcessResponse{}, s.sdk.Process.Delete(req) + return &api.DeleteProcessResponse{}, s.mc.DeleteProcess(req) } // Get returns process from given hash. func (s *ProcessServer) Get(ctx context.Context, req *api.GetProcessRequest) (*process.Process, error) { - return s.sdk.Process.Get(req.Hash) + return s.mc.GetProcess(req.Hash) } // List returns all processes. func (s *ProcessServer) List(ctx context.Context, req *api.ListProcessRequest) (*api.ListProcessResponse, error) { - processes, err := s.sdk.Process.List() + processes, err := s.mc.ListProcess() if err != nil { return nil, err } - return &api.ListProcessResponse{ - Processes: processes, - }, nil + return &api.ListProcessResponse{Processes: processes}, nil } diff --git a/server/grpc/api/runner.go b/server/grpc/api/runner.go index 28ef58001..ebd48f545 100644 --- a/server/grpc/api/runner.go +++ b/server/grpc/api/runner.go @@ -3,57 +3,61 @@ package api import ( "context" - protobuf_api "github.com/mesg-foundation/engine/protobuf/api" + "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/protobuf/api" "github.com/mesg-foundation/engine/runner" - "github.com/mesg-foundation/engine/sdk" - runnersdk "github.com/mesg-foundation/engine/sdk/runner" + "github.com/mesg-foundation/engine/runner/builder" ) // RunnerServer is the type to aggregate all Runner APIs. type RunnerServer struct { - sdk *sdk.SDK + mc *cosmos.ModuleClient + b *builder.Builder } // NewRunnerServer creates a new RunnerServer. -func NewRunnerServer(sdk *sdk.SDK) *RunnerServer { - return &RunnerServer{sdk: sdk} +func NewRunnerServer(mc *cosmos.ModuleClient, b *builder.Builder) *RunnerServer { + return &RunnerServer{ + mc: mc, + b: b, + } } // Create creates a new runner. -func (s *RunnerServer) Create(ctx context.Context, req *protobuf_api.CreateRunnerRequest) (*protobuf_api.CreateRunnerResponse, error) { - srv, err := s.sdk.Runner.Create(req) +func (s *RunnerServer) Create(ctx context.Context, req *api.CreateRunnerRequest) (*api.CreateRunnerResponse, error) { + r, err := s.b.Create(req) if err != nil { return nil, err } - return &protobuf_api.CreateRunnerResponse{Hash: srv.Hash}, nil + return &api.CreateRunnerResponse{Hash: r.Hash}, nil } // Delete deletes a runner. -func (s *RunnerServer) Delete(ctx context.Context, req *protobuf_api.DeleteRunnerRequest) (*protobuf_api.DeleteRunnerResponse, error) { - if err := s.sdk.Runner.Delete(req); err != nil { +func (s *RunnerServer) Delete(ctx context.Context, req *api.DeleteRunnerRequest) (*api.DeleteRunnerResponse, error) { + if err := s.b.Delete(req); err != nil { return nil, err } - return &protobuf_api.DeleteRunnerResponse{}, nil + return &api.DeleteRunnerResponse{}, nil } // Get returns runner from given hash. -func (s *RunnerServer) Get(ctx context.Context, req *protobuf_api.GetRunnerRequest) (*runner.Runner, error) { - return s.sdk.Runner.Get(req.Hash) +func (s *RunnerServer) Get(ctx context.Context, req *api.GetRunnerRequest) (*runner.Runner, error) { + return s.mc.GetRunner(req.Hash) } // List returns all runners. -func (s *RunnerServer) List(ctx context.Context, req *protobuf_api.ListRunnerRequest) (*protobuf_api.ListRunnerResponse, error) { - var filter *runnersdk.Filter +func (s *RunnerServer) List(ctx context.Context, req *api.ListRunnerRequest) (*api.ListRunnerResponse, error) { + var f *cosmos.FilterRunner if req.Filter != nil { - filter = &runnersdk.Filter{ + f = &cosmos.FilterRunner{ Address: req.Filter.Address, InstanceHash: req.Filter.InstanceHash, } } - runners, err := s.sdk.Runner.List(filter) + runners, err := s.mc.ListRunner(f) if err != nil { return nil, err } - return &protobuf_api.ListRunnerResponse{Runners: runners}, nil + return &api.ListRunnerResponse{Runners: runners}, nil } diff --git a/server/grpc/api/service.go b/server/grpc/api/service.go index bd15b1261..63cf2970c 100644 --- a/server/grpc/api/service.go +++ b/server/grpc/api/service.go @@ -3,24 +3,24 @@ package api import ( "context" + "github.com/mesg-foundation/engine/cosmos" protobuf_api "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" "github.com/mesg-foundation/engine/service" ) // ServiceServer is the type to aggregate all Service APIs. type ServiceServer struct { - sdk *sdk.SDK + mc *cosmos.ModuleClient } // NewServiceServer creates a new ServiceServer. -func NewServiceServer(sdk *sdk.SDK) *ServiceServer { - return &ServiceServer{sdk: sdk} +func NewServiceServer(mc *cosmos.ModuleClient) *ServiceServer { + return &ServiceServer{mc: mc} } // Create creates a new service from definition. func (s *ServiceServer) Create(ctx context.Context, req *protobuf_api.CreateServiceRequest) (*protobuf_api.CreateServiceResponse, error) { - srv, err := s.sdk.Service.Create(req) + srv, err := s.mc.CreateService(req) if err != nil { return nil, err } @@ -29,12 +29,12 @@ func (s *ServiceServer) Create(ctx context.Context, req *protobuf_api.CreateServ // Get returns service from given hash. func (s *ServiceServer) Get(ctx context.Context, req *protobuf_api.GetServiceRequest) (*service.Service, error) { - return s.sdk.Service.Get(req.Hash) + return s.mc.GetService(req.Hash) } // List returns all services. func (s *ServiceServer) List(ctx context.Context, req *protobuf_api.ListServiceRequest) (*protobuf_api.ListServiceResponse, error) { - services, err := s.sdk.Service.List() + services, err := s.mc.ListService() if err != nil { return nil, err } @@ -44,22 +44,18 @@ func (s *ServiceServer) List(ctx context.Context, req *protobuf_api.ListServiceR // Exists returns if a service already exists. func (s *ServiceServer) Exists(ctx context.Context, req *protobuf_api.ExistsServiceRequest) (*protobuf_api.ExistsServiceResponse, error) { - exists, err := s.sdk.Service.Exists(req.Hash) + exist, err := s.mc.ExistService(req.Hash) if err != nil { return nil, err } - return &protobuf_api.ExistsServiceResponse{ - Exists: exists, - }, nil + return &protobuf_api.ExistsServiceResponse{Exists: exist}, nil } // Hash returns the calculated hash of a service request. func (s *ServiceServer) Hash(ctx context.Context, req *protobuf_api.CreateServiceRequest) (*protobuf_api.HashServiceResponse, error) { - h, err := s.sdk.Service.Hash(req) + h, err := s.mc.HashService(req) if err != nil { return nil, err } - return &protobuf_api.HashServiceResponse{ - Hash: h, - }, nil + return &protobuf_api.HashServiceResponse{Hash: h}, nil } diff --git a/server/grpc/server.go b/server/grpc/server.go index d43ab13f6..aca365dd5 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -8,11 +8,11 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/mesg-foundation/engine/config" "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/event/publisher" "github.com/mesg-foundation/engine/ext/xvalidator" protobuf_api "github.com/mesg-foundation/engine/protobuf/api" - "github.com/mesg-foundation/engine/sdk" + "github.com/mesg-foundation/engine/runner/builder" "github.com/mesg-foundation/engine/server/grpc/api" "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -23,17 +23,17 @@ import ( // Server contains the server config. type Server struct { instance *grpc.Server - sdk *sdk.SDK - cfg *config.Config - client *cosmos.Client + mc *cosmos.ModuleClient + ep *publisher.EventPublisher + b *builder.Builder } // New returns a new gRPC server. -func New(sdk *sdk.SDK, cfg *config.Config, client *cosmos.Client) *Server { +func New(mc *cosmos.ModuleClient, ep *publisher.EventPublisher, b *builder.Builder) *Server { return &Server{ - sdk: sdk, - cfg: cfg, - client: client, + mc: mc, + ep: ep, + b: b, } } @@ -81,13 +81,13 @@ func validateInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryS // register all server func (s *Server) register() { - protobuf_api.RegisterEventServer(s.instance, api.NewEventServer(s.sdk)) - protobuf_api.RegisterExecutionServer(s.instance, api.NewExecutionServer(s.sdk)) - protobuf_api.RegisterInstanceServer(s.instance, api.NewInstanceServer(s.sdk, s.client)) - protobuf_api.RegisterServiceServer(s.instance, api.NewServiceServer(s.sdk)) - protobuf_api.RegisterProcessServer(s.instance, api.NewProcessServer(s.sdk)) - protobuf_api.RegisterOwnershipServer(s.instance, api.NewOwnershipServer(s.sdk, s.client)) - protobuf_api.RegisterRunnerServer(s.instance, api.NewRunnerServer(s.sdk)) + protobuf_api.RegisterEventServer(s.instance, api.NewEventServer(s.ep)) + protobuf_api.RegisterExecutionServer(s.instance, api.NewExecutionServer(s.mc)) + protobuf_api.RegisterInstanceServer(s.instance, api.NewInstanceServer(s.mc)) + protobuf_api.RegisterServiceServer(s.instance, api.NewServiceServer(s.mc)) + protobuf_api.RegisterProcessServer(s.instance, api.NewProcessServer(s.mc)) + protobuf_api.RegisterOwnershipServer(s.instance, api.NewOwnershipServer(s.mc)) + protobuf_api.RegisterRunnerServer(s.instance, api.NewRunnerServer(s.mc, s.b)) reflection.Register(s.instance) } diff --git a/x/instance/internal/keeper/querier.go b/x/instance/internal/keeper/querier.go index ace4b7df2..f4ae39726 100644 --- a/x/instance/internal/keeper/querier.go +++ b/x/instance/internal/keeper/querier.go @@ -42,14 +42,14 @@ func getInstance(ctx sdk.Context, path []string, k Keeper) ([]byte, error) { } func listInstance(ctx sdk.Context, req abci.RequestQuery, k Keeper) ([]byte, error) { - var f *api.ListInstanceRequest_Filter + var f *api.ListInstanceRequest if len(req.Data) > 0 { if err := types.ModuleCdc.UnmarshalJSON(req.Data, &f); err != nil { return nil, err } } - instances, err := k.List(ctx, f) + instances, err := k.List(ctx, f.Filter) if err != nil { return nil, err }