Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject runner hash in docker services #1512

Merged
merged 6 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func testExecution(t *testing.T) {
})
require.NoError(t, err)

// recive in progress status
// receive in progress status
exec, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, resp.Hash, exec.Hash)
require.Equal(t, "ping", exec.TaskKey)
require.Equal(t, execution.Status_InProgress, exec.Status)

// recive completed status
// receive completed status
exec, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, resp.Hash, exec.Hash)
Expand Down
2 changes: 1 addition & 1 deletion e2e/testdata/test-service.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,5 @@
]
}
],
"source": "QmPG1Ze96pH1EgVMWsGKM33jXoG63rigMncSEqZXP7oncq"
"source": "QmPkjHLWUwTVjJsy7ioFkxPL9yh7URYK2AUYYkTzJTmhJQ"
}
6 changes: 2 additions & 4 deletions e2e/testdata/test-service/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
FROM golang:1.13

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download
COPY . .

RUN go build -o "mesg-test"

CMD [ "./mesg-test" ]
20 changes: 16 additions & 4 deletions e2e/testdata/test-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
// env variables for configure mesg client.
envMesgEndpoint = "MESG_ENDPOINT"
envMesgInstanceHash = "MESG_INSTANCE_HASH"
envMesgRunnerHash = "MESG_RUNNER_HASH"
krhubert marked this conversation as resolved.
Show resolved Hide resolved
)

// Client is a client to connect to all mesg exposed API.
Expand All @@ -29,6 +30,9 @@ type Client struct {

// instance hash that could be used in api calls.
InstanceHash hash.Hash

// runner hash that could be used in api calls.
RunnerHash hash.Hash
}

// New creates a new client from env variables supplied by mesg engine.
Expand All @@ -43,6 +47,11 @@ func New() (*Client, error) {
return nil, fmt.Errorf("client: error with mesg's instance hash env(%s): %s", envMesgInstanceHash, err.Error())
}

runnerHash, err := hash.Decode(os.Getenv(envMesgRunnerHash))
if err != nil {
return nil, fmt.Errorf("client: error with mesg's runner hash env(%s): %s", envMesgRunnerHash, err.Error())
}

dialoptions := []grpc.DialOption{
// Keep alive prevents Docker network to drop TCP idle connections after 15 minutes.
// See: https://forum.mesg.com/t/solution-summary-for-docker-dropping-connections-after-15-min/246
Expand All @@ -62,6 +71,7 @@ func New() (*Client, error) {
ExecutionClient: pb.NewExecutionClient(conn),
EventClient: pb.NewEventClient(conn),
InstanceHash: instanceHash,
RunnerHash: runnerHash,
}, nil
}

Expand All @@ -72,7 +82,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
log.Printf("connect to %s\n", os.Getenv(envMesgEndpoint))
log.Printf("connected to %s\n", os.Getenv(envMesgEndpoint))

stream, err := client.ExecutionClient.Stream(context.Background(), &pb.StreamExecutionRequest{
Filter: &pb.StreamExecutionRequest_Filter{
Expand All @@ -83,21 +93,22 @@ func main() {
if err != nil {
log.Fatal(err)
}
log.Printf("create a stream\n")
log.Printf("create execution stream\n")

if _, err := client.EventClient.Create(context.Background(), &pb.CreateEventRequest{
InstanceHash: client.InstanceHash,
Key: "test_service_ready",
}); err != nil {
log.Fatal(err)
}
log.Printf("emit test_service_ready event\n")

for {
exec, err := stream.Recv()
if err != nil {
log.Fatal(err)
}
log.Printf("recive exeuction %s %s\n", exec.TaskKey, exec.Hash)
log.Printf("receive execution %s %s\n", exec.TaskKey, exec.Hash)

req := &pb.UpdateExecutionRequest{
Hash: exec.Hash,
Expand Down Expand Up @@ -136,7 +147,7 @@ func main() {
}

if _, err := client.EventClient.Create(context.Background(), &pb.CreateEventRequest{
InstanceHash: exec.InstanceHash,
InstanceHash: client.InstanceHash,
krhubert marked this conversation as resolved.
Show resolved Hide resolved
Key: exec.TaskKey + "_ok",
Data: &types.Struct{
Fields: map[string]*types.Value{
Expand All @@ -150,5 +161,6 @@ func main() {
}); err != nil {
log.Fatal(err)
}
log.Printf("emit " + exec.TaskKey + "_ok event\n")
}
}
36 changes: 19 additions & 17 deletions sdk/runner/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string)
}
defer os.RemoveAll(path)

// TODO: the ipfs url should be in config
resp, err := http.Get(ipfsEndpoint + srv.Source)
if err != nil {
return "", err
}
if resp.StatusCode != 200 {
return "", errors.New("service's source code is not reachable")
return "", errors.New("service's source code is not reachable, status: " + resp.Status + ", url: " + ipfsEndpoint + srv.Source)
}
defer resp.Body.Close()

Expand All @@ -52,10 +51,10 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string)
}

// Start starts the service.
func start(cont container.Container, srv *service.Service, instanceHash hash.Hash, imageHash string, env []string, engineName, port string) (serviceIDs []string, err error) {
func start(cont container.Container, srv *service.Service, instanceHash hash.Hash, runnerHash hash.Hash, imageHash string, env []string, engineName, port string) (serviceIDs []string, err error) {
endpoint := net.JoinHostPort(engineName, port)
instNamespace := instanceNamespace(instanceHash)
networkID, err := cont.CreateNetwork(instNamespace)
namespace := namespace(runnerHash)
networkID, err := cont.CreateNetwork(namespace)
if err != nil {
return nil, err
}
Expand All @@ -72,12 +71,13 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has
return nil, err
}
configs = append(configs, container.ServiceOptions{
Namespace: dependencyNamespace(instNamespace, d.Key),
Namespace: dependencyNamespace(namespace, d.Key),
Labels: map[string]string{
"mesg.engine": engineName,
"mesg.sid": srv.Sid,
"mesg.service": srv.Hash.String(),
"mesg.instance": instanceHash.String(),
"mesg.runner": runnerHash.String(),
"mesg.dependency": d.Key,
},
Image: d.Image,
Expand All @@ -100,12 +100,13 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has
return nil, err
}
configs = append(configs, container.ServiceOptions{
Namespace: dependencyNamespace(instNamespace, service.MainServiceKey),
Namespace: dependencyNamespace(namespace, service.MainServiceKey),
Labels: map[string]string{
"mesg.engine": engineName,
"mesg.sid": srv.Sid,
"mesg.service": srv.Hash.String(),
"mesg.instance": instanceHash.String(),
"mesg.runner": runnerHash.String(),
"mesg.dependency": service.MainServiceKey,
},
Image: imageHash,
Expand All @@ -114,6 +115,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has
Env: xos.EnvMergeSlices(env, []string{
"MESG_TOKEN=" + instanceHash.String(),
"MESG_INSTANCE_HASH=" + instanceHash.String(),
"MESG_RUNNER_HASH=" + runnerHash.String(),
"MESG_ENDPOINT=" + endpoint,
}),
Mounts: append(volumes, volumesFrom...),
Expand All @@ -129,7 +131,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has
for _, c := range configs {
serviceID, err := cont.StartService(c)
if err != nil {
stop(cont, instanceHash, srv.Dependencies)
stop(cont, runnerHash, srv.Dependencies)
return nil, err
}
serviceIDs = append(serviceIDs, serviceID)
Expand All @@ -139,18 +141,18 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has
}

// Stop stops an instance.
func stop(cont container.Container, instanceHash hash.Hash, dependencies []*service.Service_Dependency) error {
func stop(cont container.Container, runnerHash hash.Hash, dependencies []*service.Service_Dependency) error {
var (
wg sync.WaitGroup
errs xerrors.SyncErrors
sNamespace = instanceNamespace(instanceHash)
namespace = namespace(runnerHash)
namespaces = make([]string, 0)
)

for _, d := range dependencies {
namespaces = append(namespaces, dependencyNamespace(sNamespace, d.Key))
namespaces = append(namespaces, dependencyNamespace(namespace, d.Key))
}
namespaces = append(namespaces, dependencyNamespace(sNamespace, service.MainServiceKey))
namespaces = append(namespaces, dependencyNamespace(namespace, service.MainServiceKey))

for _, namespace := range namespaces {
wg.Add(1)
Expand All @@ -166,7 +168,7 @@ func stop(cont container.Container, instanceHash hash.Hash, dependencies []*serv
return err
}

return cont.DeleteNetwork(sNamespace)
return cont.DeleteNetwork(namespace)
}

// deleteData deletes the data volumes of instance and its dependencies.
Expand Down Expand Up @@ -201,14 +203,14 @@ func deleteData(cont container.Container, s *service.Service) error {
return errs.ErrorOrNil()
}

// instanceNamespace returns the namespace of the service.
func instanceNamespace(hash hash.Hash) string {
// namespace returns the namespace of the service.
func namespace(hash hash.Hash) string {
return hash.String()
}

// dependencyNamespace builds the namespace of a dependency.
func dependencyNamespace(instanceNamespace string, dependencyKey string) string {
return hash.Dump(instanceNamespace + dependencyKey).String()
func dependencyNamespace(namespace string, dependencyKey string) string {
return hash.Dump(namespace + dependencyKey).String()
}

func convertPorts(dPorts []string) []container.Port {
Expand Down
27 changes: 21 additions & 6 deletions sdk/runner/sdk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package runnersdk

import (
"errors"

cosmostypes "github.com/cosmos/cosmos-sdk/types"
"github.com/mesg-foundation/engine/container"
"github.com/mesg-foundation/engine/cosmos"
Expand Down Expand Up @@ -67,32 +69,45 @@ func (s *SDK) Create(req *api.CreateRunnerRequest, accountName, accountPassword
}
instanceEnv := xos.EnvMergeSlices(srv.Configuration.Env, req.Env)
envHash := hash.Dump(instanceEnv)
// TODO: should be done by instance
// TODO: should be done by instance or runner
instanceHash := hash.Dump(&instance.Instance{
ServiceHash: srv.Hash,
EnvHash: envHash,
})
expRunnerHash := hash.Dump(&runner.Runner{
krhubert marked this conversation as resolved.
Show resolved Hide resolved
Address: user.String(),
InstanceHash: instanceHash,
})

// start the container
imageHash, err := build(s.container, srv, s.ipfsEndpoint)
if err != nil {
return nil, err
}
_, err = start(s.container, srv, instanceHash, imageHash, instanceEnv, s.engineName, s.port)
_, err = start(s.container, srv, instanceHash, expRunnerHash, imageHash, instanceEnv, s.engineName, s.port)
if err != nil {
return nil, err
}
onError := func() {
stop(s.container, instanceHash, srv.Dependencies)
stop(s.container, expRunnerHash, srv.Dependencies)
}

msg := newMsgCreateRunner(user, req.ServiceHash, envHash)
tx, err := s.client.BuildAndBroadcastMsg(msg, accountName, accountPassword)
if err != nil {
defer onError()
onError()
return nil, err
}
run, err := s.Get(tx.Data)
if err != nil {
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
onError()
return nil, err
}
return s.Get(tx.Data)
if !run.Hash.Equal(expRunnerHash) {
onError()
return nil, errors.New("calculated runner hash is not the same")
NicolasMahe marked this conversation as resolved.
Show resolved Hide resolved
}
return run, nil
}

// Delete deletes an existing runner.
Expand Down Expand Up @@ -130,7 +145,7 @@ func (s *SDK) Delete(req *api.DeleteRunnerRequest, accountName, accountPassword
}

// stop the local running service
if err := stop(s.container, inst.Hash, srv.Dependencies); err != nil {
if err := stop(s.container, runner.Hash, srv.Dependencies); err != nil {
return err
}

Expand Down