Skip to content

Commit

Permalink
Change execution.hash type to []byte
Browse files Browse the repository at this point in the history
  • Loading branch information
krhubert committed May 29, 2019
1 parent 5736b9a commit 2e6b2a0
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 52 deletions.
21 changes: 10 additions & 11 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,36 @@ func (a *API) EmitEvent(token, eventKey string, eventData map[string]interface{}
}

// ExecuteTask executes a task tasKey with inputData and tags for service serviceID.
func (a *API) ExecuteTask(serviceID, taskKey string, inputData map[string]interface{},
tags []string) (executionHash string, err error) {
func (a *API) ExecuteTask(serviceID, taskKey string, inputData map[string]interface{}, tags []string) (executionHash []byte, err error) {
s, err := a.db.Get(serviceID)
if err != nil {
return "", err
return nil, err
}
// a task should be executed only if task's service is running.
status, err := a.m.Status(s)
if err != nil {
return "", err
return nil, err
}
if status != service.RUNNING {
return "", &NotRunningServiceError{ServiceID: s.Sid}
return nil, &NotRunningServiceError{ServiceID: s.Sid}
}

task, err := s.GetTask(taskKey)
if err != nil {
return "", err
return nil, err
}
if err := task.RequireInputs(inputData); err != nil {
return "", err
return nil, err
}

// execute the task.
eventID := uuid.NewV4().String()
exec := execution.New(s.Hash, "", eventID, taskKey, inputData, tags)
if err := exec.Execute(); err != nil {
return "", err
return nil, err
}
if err = a.execDB.Save(exec); err != nil {
return "", err
return nil, err
}
go a.ps.Pub(exec, executionSubTopic(s.Hash))
return exec.Hash, nil
Expand Down Expand Up @@ -192,7 +191,7 @@ func (a *API) ListenExecution(service string, f *ExecutionFilter) (*ExecutionLis
}

// SubmitResult submits results for executionHash.
func (a *API) SubmitResult(executionHash string, outputKey string, outputs []byte) error {
func (a *API) SubmitResult(executionHash []byte, outputKey string, outputs []byte) error {
exec, stateChanged, err := a.processExecution(executionHash, outputKey, outputs)
if stateChanged {
// only publish to listeners when the execution's state changed.
Expand All @@ -202,7 +201,7 @@ func (a *API) SubmitResult(executionHash string, outputKey string, outputs []byt
}

// processExecution processes execution and marks it as complated or failed.
func (a *API) processExecution(executionHash string, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) {
func (a *API) processExecution(executionHash []byte, outputKey string, outputData []byte) (exec *execution.Execution, stateChanged bool, err error) {
stateChanged = false
tx, err := a.execDB.OpenTransaction()
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions database/execution_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

// ExecutionDB exposes all the functionalities
type ExecutionDB interface {
Find(executionHash string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
OpenTransaction() (ExecutionTransaction, error)
io.Closer
}

// ExecutionTransaction is the transaction handle.
type ExecutionTransaction interface {
Find(executionHash string) (*execution.Execution, error)
Find(executionHash []byte) (*execution.Execution, error)
Save(execution *execution.Execution) error
Commit() error
Discard()
Expand All @@ -41,7 +41,7 @@ func NewExecutionDB(path string) (*LevelDBExecutionDB, error) {
}

// Find the execution based on an executionHash, returns an error if not found
func (db *LevelDBExecutionDB) Find(executionHash string) (*execution.Execution, error) {
func (db *LevelDBExecutionDB) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(db.db, executionHash)
}

Expand Down Expand Up @@ -72,7 +72,7 @@ type LevelDBExecutionTransaction struct {
}

// Find the execution based on an executionHash, returns an error if not found
func (tx *LevelDBExecutionTransaction) Find(executionHash string) (*execution.Execution, error) {
func (tx *LevelDBExecutionTransaction) Find(executionHash []byte) (*execution.Execution, error) {
return executionFind(tx.tx, executionHash)
}

Expand All @@ -93,8 +93,8 @@ func (tx *LevelDBExecutionTransaction) Discard() {
}

// Find the execution based on an executionHash, returns an error if not found
func executionFind(db leveldbTxDB, executionHash string) (*execution.Execution, error) {
data, err := db.Get([]byte(executionHash), nil)
func executionFind(db leveldbTxDB, executionHash []byte) (*execution.Execution, error) {
data, err := db.Get(executionHash, nil)
if err != nil {
return nil, err
}
Expand All @@ -108,12 +108,12 @@ func executionFind(db leveldbTxDB, executionHash string) (*execution.Execution,
// Save an instance of executable in the database
// Returns an error if anything from marshaling to database saving goes wrong
func executionSave(db leveldbTxDB, execution *execution.Execution) error {
if execution.Hash == "" {
return errors.New("database: can't save service without id")
if len(execution.Hash) == 0 {
return errors.New("database: can't save execution without hash")
}
data, err := json.Marshal(execution)
if err != nil {
return err
}
return db.Put([]byte(execution.Hash), data, nil)
return db.Put(execution.Hash, data, nil)
}
8 changes: 4 additions & 4 deletions database/execution_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ func TestFind(t *testing.T) {
defer os.RemoveAll(dir)
db := db(t, dir)
defer db.Close()
e := &execution.Execution{Hash: "xxx"}
e := &execution.Execution{Hash: []byte{'1'}}
db.Save(e)
tests := []struct {
hash string
hash []byte
hasError bool
}{
{hash: e.Hash, hasError: false},
{hash: "doesn't exists", hasError: true},
{hash: []byte{1}, hasError: true},
}
for _, test := range tests {
execution, err := db.Find(test.hash)
Expand All @@ -52,7 +52,7 @@ func TestSave(t *testing.T) {
execution *execution.Execution
hasError bool
}{
{&execution.Execution{Hash: "xxx"}, false},
{&execution.Execution{Hash: []byte{'1'}}, false},
{&execution.Execution{}, true},
}
for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s Status) String() (r string) {

// Execution stores all informations about executions.
type Execution struct {
Hash string `hash:"-"`
Hash []byte `hash:"-"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
ServiceHash string `hash:"name:serviceHash"`
Expand Down
4 changes: 2 additions & 2 deletions execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func TestExecutionHash(t *testing.T) {

f := func(service, parentService, eventID, taskKey, input string, tags []string) bool {
e := New(service, parentService, eventID, taskKey, map[string]interface{}{"input": input}, tags)
if ids[e.Hash] {
if ids[string(e.Hash)] {
return false
}
ids[e.Hash] = true
ids[string(e.Hash)] = true
return true
}

Expand Down
4 changes: 2 additions & 2 deletions interface/grpc/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *Server) ListenResult(request *coreapi.ListenResultRequest, stream corea
return err
}
if err := stream.Send(&coreapi.ResultData{
ExecutionHash: execution.Hash,
ExecutionHash: string(execution.Hash),
TaskKey: execution.TaskKey,
OutputKey: execution.OutputKey,
OutputData: string(outputs),
Expand All @@ -192,7 +192,7 @@ func (s *Server) ExecuteTask(ctx context.Context, request *coreapi.ExecuteTaskRe

executionHash, err := s.api.ExecuteTask(request.ServiceID, request.TaskKey, inputs, request.ExecutionTags)
return &coreapi.ExecuteTaskReply{
ExecutionHash: executionHash,
ExecutionHash: string(executionHash),
}, err
}

Expand Down
4 changes: 2 additions & 2 deletions interface/grpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream servic
}

if err := stream.Send(&serviceapi.TaskData{
ExecutionHash: execution.Hash,
ExecutionHash: string(execution.Hash),
TaskKey: execution.TaskKey,
InputData: string(inputs),
}); err != nil {
Expand All @@ -69,5 +69,5 @@ func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream servic

// SubmitResult submits results of an execution.
func (s *Server) SubmitResult(context context.Context, request *serviceapi.SubmitResultRequest) (*serviceapi.SubmitResultReply, error) {
return &serviceapi.SubmitResultReply{}, s.api.SubmitResult(request.ExecutionHash, request.OutputKey, []byte(request.OutputData))
return &serviceapi.SubmitResultReply{}, s.api.SubmitResult([]byte(request.ExecutionHash), request.OutputKey, []byte(request.OutputData))
}
8 changes: 4 additions & 4 deletions interface/grpc/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestSubmit(t *testing.T) {
defer ln.Close()

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionHash: executionHash,
ExecutionHash: string(executionHash),
OutputKey: outputKey,
OutputData: outputData,
})
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestSubmitWithInvalidJSON(t *testing.T) {
require.NoError(t, err)

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionHash: executionHash,
ExecutionHash: string(executionHash),
OutputKey: outputKey,
OutputData: "",
})
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestSubmitWithNonExistentOutputKey(t *testing.T) {
require.NoError(t, err)

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionHash: executionHash,
ExecutionHash: string(executionHash),
OutputKey: outputKey,
OutputData: outputData,
})
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestSubmitWithInvalidTaskOutputs(t *testing.T) {
require.NoError(t, err)

_, err = server.SubmitResult(context.Background(), &serviceapi.SubmitResultRequest{
ExecutionHash: executionHash,
ExecutionHash: string(executionHash),
OutputKey: outputKey,
OutputData: outputData,
})
Expand Down
4 changes: 2 additions & 2 deletions service/manager/dockermanager/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ func extractVolumesFrom(s *service.Service, d *service.Dependency) ([]container.
// volumeKey creates a key for service's volume based on the sid to make sure that the volume
// will stay the same for different versions of the service.
func volumeKey(s *service.Service, dependency string, volume string) string {
return xstructhash.Hash([]string{
return string(xstructhash.Hash([]string{
s.Sid,
dependency,
volume,
}, 1)
}, 1))
}
10 changes: 4 additions & 6 deletions x/xstructhash/structhash.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package xstructhash

import (
"fmt"

"github.com/cnf/structhash"
)

// Hash takes a data structure and returns a hash string of that data structure
// Hash takes a data structure and returns a hash of that data structure
// at the version asked.
//
// This function uses md5 hashing function and default formatter. See also Dump()
// This function uses sha1 hashing function and default formatter. See also Dump()
// function.
func Hash(c interface{}, version int) string {
return fmt.Sprintf("%x", structhash.Sha1(c, version))
func Hash(c interface{}, version int) []byte {
return structhash.Sha1(c, version)
}
11 changes: 2 additions & 9 deletions x/xstructhash/structhash_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package xstructhash

import (
"crypto/sha1"
"fmt"
"testing"

"github.com/cnf/structhash"
)

func TestHash(t *testing.T) {
s := struct{}{}
v := 1
got := Hash(s, v)
want := fmt.Sprintf("%x", sha1.Sum(structhash.Dump(s, v)))
if got != want {
t.Errorf("invalid hash")
if got := fmt.Sprintf("%x", Hash(struct{}{}, 1)); got != "bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f" {
t.Errorf("hash dosen't match - got: %s", got)
}
}

0 comments on commit 2e6b2a0

Please sign in to comment.