diff --git a/e2e/execution_test.go b/e2e/execution_test.go index b52fc1cb5..854e012eb 100644 --- a/e2e/execution_test.go +++ b/e2e/execution_test.go @@ -37,14 +37,14 @@ func testExecution(t *testing.T) { }) require.NoError(t, err) - // recive in progress status + // receive in progress status exec, err := stream.Recv() require.NoError(t, err) require.Equal(t, resp.Hash, exec.Hash) require.Equal(t, "ping", exec.TaskKey) require.Equal(t, execution.Status_InProgress, exec.Status) - // recive completed status + // receive completed status exec, err = stream.Recv() require.NoError(t, err) require.Equal(t, resp.Hash, exec.Hash) diff --git a/e2e/testdata/test-service.json b/e2e/testdata/test-service.json index a6cc6bc94..21c25ec50 100644 --- a/e2e/testdata/test-service.json +++ b/e2e/testdata/test-service.json @@ -91,5 +91,5 @@ ] } ], - "source": "QmPG1Ze96pH1EgVMWsGKM33jXoG63rigMncSEqZXP7oncq" + "source": "QmPkjHLWUwTVjJsy7ioFkxPL9yh7URYK2AUYYkTzJTmhJQ" } diff --git a/e2e/testdata/test-service/Dockerfile b/e2e/testdata/test-service/Dockerfile index ffd3fdcc9..45bdf004d 100644 --- a/e2e/testdata/test-service/Dockerfile +++ b/e2e/testdata/test-service/Dockerfile @@ -1,9 +1,7 @@ FROM golang:1.13 - WORKDIR /app - +COPY go.mod go.sum ./ +RUN go mod download COPY . . - RUN go build -o "mesg-test" - CMD [ "./mesg-test" ] diff --git a/e2e/testdata/test-service/main.go b/e2e/testdata/test-service/main.go index ec66d0a6b..085aab5a7 100644 --- a/e2e/testdata/test-service/main.go +++ b/e2e/testdata/test-service/main.go @@ -19,6 +19,7 @@ const ( // env variables for configure mesg client. envMesgEndpoint = "MESG_ENDPOINT" envMesgInstanceHash = "MESG_INSTANCE_HASH" + envMesgRunnerHash = "MESG_RUNNER_HASH" ) // Client is a client to connect to all mesg exposed API. @@ -29,6 +30,9 @@ type Client struct { // instance hash that could be used in api calls. InstanceHash hash.Hash + + // runner hash that could be used in api calls. + RunnerHash hash.Hash } // New creates a new client from env variables supplied by mesg engine. @@ -43,6 +47,11 @@ func New() (*Client, error) { return nil, fmt.Errorf("client: error with mesg's instance hash env(%s): %s", envMesgInstanceHash, err.Error()) } + runnerHash, err := hash.Decode(os.Getenv(envMesgRunnerHash)) + if err != nil { + return nil, fmt.Errorf("client: error with mesg's runner hash env(%s): %s", envMesgRunnerHash, err.Error()) + } + dialoptions := []grpc.DialOption{ // Keep alive prevents Docker network to drop TCP idle connections after 15 minutes. // See: https://forum.mesg.com/t/solution-summary-for-docker-dropping-connections-after-15-min/246 @@ -62,6 +71,7 @@ func New() (*Client, error) { ExecutionClient: pb.NewExecutionClient(conn), EventClient: pb.NewEventClient(conn), InstanceHash: instanceHash, + RunnerHash: runnerHash, }, nil } @@ -72,7 +82,7 @@ func main() { if err != nil { log.Fatal(err) } - log.Printf("connect to %s\n", os.Getenv(envMesgEndpoint)) + log.Printf("connected to %s\n", os.Getenv(envMesgEndpoint)) stream, err := client.ExecutionClient.Stream(context.Background(), &pb.StreamExecutionRequest{ Filter: &pb.StreamExecutionRequest_Filter{ @@ -83,7 +93,7 @@ func main() { if err != nil { log.Fatal(err) } - log.Printf("create a stream\n") + log.Printf("create execution stream\n") if _, err := client.EventClient.Create(context.Background(), &pb.CreateEventRequest{ InstanceHash: client.InstanceHash, @@ -91,13 +101,14 @@ func main() { }); err != nil { log.Fatal(err) } + log.Printf("emit test_service_ready event\n") for { exec, err := stream.Recv() if err != nil { log.Fatal(err) } - log.Printf("recive exeuction %s %s\n", exec.TaskKey, exec.Hash) + log.Printf("receive execution %s %s\n", exec.TaskKey, exec.Hash) req := &pb.UpdateExecutionRequest{ Hash: exec.Hash, @@ -136,7 +147,7 @@ func main() { } if _, err := client.EventClient.Create(context.Background(), &pb.CreateEventRequest{ - InstanceHash: exec.InstanceHash, + InstanceHash: client.InstanceHash, Key: exec.TaskKey + "_ok", Data: &types.Struct{ Fields: map[string]*types.Value{ @@ -150,5 +161,6 @@ func main() { }); err != nil { log.Fatal(err) } + log.Printf("emit " + exec.TaskKey + "_ok event\n") } } diff --git a/sdk/runner/container.go b/sdk/runner/container.go index ff1f01d61..1d1d6d0ba 100644 --- a/sdk/runner/container.go +++ b/sdk/runner/container.go @@ -28,13 +28,12 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string) } defer os.RemoveAll(path) - // TODO: the ipfs url should be in config resp, err := http.Get(ipfsEndpoint + srv.Source) if err != nil { return "", err } if resp.StatusCode != 200 { - return "", errors.New("service's source code is not reachable") + return "", errors.New("service's source code is not reachable, status: " + resp.Status + ", url: " + ipfsEndpoint + srv.Source) } defer resp.Body.Close() @@ -52,10 +51,10 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string) } // Start starts the service. -func start(cont container.Container, srv *service.Service, instanceHash hash.Hash, imageHash string, env []string, engineName, port string) (serviceIDs []string, err error) { +func start(cont container.Container, srv *service.Service, instanceHash hash.Hash, runnerHash hash.Hash, imageHash string, env []string, engineName, port string) (serviceIDs []string, err error) { endpoint := net.JoinHostPort(engineName, port) - instNamespace := instanceNamespace(instanceHash) - networkID, err := cont.CreateNetwork(instNamespace) + namespace := namespace(runnerHash) + networkID, err := cont.CreateNetwork(namespace) if err != nil { return nil, err } @@ -72,12 +71,13 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has return nil, err } configs = append(configs, container.ServiceOptions{ - Namespace: dependencyNamespace(instNamespace, d.Key), + Namespace: dependencyNamespace(namespace, d.Key), Labels: map[string]string{ "mesg.engine": engineName, "mesg.sid": srv.Sid, "mesg.service": srv.Hash.String(), "mesg.instance": instanceHash.String(), + "mesg.runner": runnerHash.String(), "mesg.dependency": d.Key, }, Image: d.Image, @@ -100,12 +100,13 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has return nil, err } configs = append(configs, container.ServiceOptions{ - Namespace: dependencyNamespace(instNamespace, service.MainServiceKey), + Namespace: dependencyNamespace(namespace, service.MainServiceKey), Labels: map[string]string{ "mesg.engine": engineName, "mesg.sid": srv.Sid, "mesg.service": srv.Hash.String(), "mesg.instance": instanceHash.String(), + "mesg.runner": runnerHash.String(), "mesg.dependency": service.MainServiceKey, }, Image: imageHash, @@ -114,6 +115,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has Env: xos.EnvMergeSlices(env, []string{ "MESG_TOKEN=" + instanceHash.String(), "MESG_INSTANCE_HASH=" + instanceHash.String(), + "MESG_RUNNER_HASH=" + runnerHash.String(), "MESG_ENDPOINT=" + endpoint, }), Mounts: append(volumes, volumesFrom...), @@ -129,7 +131,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has for _, c := range configs { serviceID, err := cont.StartService(c) if err != nil { - stop(cont, instanceHash, srv.Dependencies) + stop(cont, runnerHash, srv.Dependencies) return nil, err } serviceIDs = append(serviceIDs, serviceID) @@ -139,18 +141,18 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has } // Stop stops an instance. -func stop(cont container.Container, instanceHash hash.Hash, dependencies []*service.Service_Dependency) error { +func stop(cont container.Container, runnerHash hash.Hash, dependencies []*service.Service_Dependency) error { var ( wg sync.WaitGroup errs xerrors.SyncErrors - sNamespace = instanceNamespace(instanceHash) + namespace = namespace(runnerHash) namespaces = make([]string, 0) ) for _, d := range dependencies { - namespaces = append(namespaces, dependencyNamespace(sNamespace, d.Key)) + namespaces = append(namespaces, dependencyNamespace(namespace, d.Key)) } - namespaces = append(namespaces, dependencyNamespace(sNamespace, service.MainServiceKey)) + namespaces = append(namespaces, dependencyNamespace(namespace, service.MainServiceKey)) for _, namespace := range namespaces { wg.Add(1) @@ -166,7 +168,7 @@ func stop(cont container.Container, instanceHash hash.Hash, dependencies []*serv return err } - return cont.DeleteNetwork(sNamespace) + return cont.DeleteNetwork(namespace) } // deleteData deletes the data volumes of instance and its dependencies. @@ -201,14 +203,14 @@ func deleteData(cont container.Container, s *service.Service) error { return errs.ErrorOrNil() } -// instanceNamespace returns the namespace of the service. -func instanceNamespace(hash hash.Hash) string { +// namespace returns the namespace of the service. +func namespace(hash hash.Hash) string { return hash.String() } // dependencyNamespace builds the namespace of a dependency. -func dependencyNamespace(instanceNamespace string, dependencyKey string) string { - return hash.Dump(instanceNamespace + dependencyKey).String() +func dependencyNamespace(namespace string, dependencyKey string) string { + return hash.Dump(namespace + dependencyKey).String() } func convertPorts(dPorts []string) []container.Port { diff --git a/sdk/runner/sdk.go b/sdk/runner/sdk.go index b2d6fb35b..4a0adb3b1 100644 --- a/sdk/runner/sdk.go +++ b/sdk/runner/sdk.go @@ -1,6 +1,8 @@ package runnersdk import ( + "errors" + cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/mesg-foundation/engine/container" "github.com/mesg-foundation/engine/cosmos" @@ -67,32 +69,45 @@ func (s *SDK) Create(req *api.CreateRunnerRequest, accountName, accountPassword } instanceEnv := xos.EnvMergeSlices(srv.Configuration.Env, req.Env) envHash := hash.Dump(instanceEnv) - // TODO: should be done by instance + // TODO: should be done by instance or runner instanceHash := hash.Dump(&instance.Instance{ ServiceHash: srv.Hash, EnvHash: envHash, }) + expRunnerHash := hash.Dump(&runner.Runner{ + Address: user.String(), + InstanceHash: instanceHash, + }) // start the container imageHash, err := build(s.container, srv, s.ipfsEndpoint) if err != nil { return nil, err } - _, err = start(s.container, srv, instanceHash, imageHash, instanceEnv, s.engineName, s.port) + _, err = start(s.container, srv, instanceHash, expRunnerHash, imageHash, instanceEnv, s.engineName, s.port) if err != nil { return nil, err } onError := func() { - stop(s.container, instanceHash, srv.Dependencies) + stop(s.container, expRunnerHash, srv.Dependencies) } msg := newMsgCreateRunner(user, req.ServiceHash, envHash) tx, err := s.client.BuildAndBroadcastMsg(msg, accountName, accountPassword) if err != nil { - defer onError() + onError() + return nil, err + } + run, err := s.Get(tx.Data) + if err != nil { + onError() return nil, err } - return s.Get(tx.Data) + if !run.Hash.Equal(expRunnerHash) { + onError() + return nil, errors.New("calculated runner hash is not the same") + } + return run, nil } // Delete deletes an existing runner. @@ -130,7 +145,7 @@ func (s *SDK) Delete(req *api.DeleteRunnerRequest, accountName, accountPassword } // stop the local running service - if err := stop(s.container, inst.Hash, srv.Dependencies); err != nil { + if err := stop(s.container, runner.Hash, srv.Dependencies); err != nil { return err }