Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add function events #310

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- BREAKING: `distributed` Skaffold profile and mentions in doc ([#319](https://github.com/Substra/orchestrator/pull/319))
- BREAKING: `chaincode-init` and `chaincode` Dockerfiles ([#319](https://github.com/Substra/orchestrator/pull/319))

### Added

- Enum `FailedAssetKind` ([#277](https://github.com/Substra/orchestrator/pull/277))
- BREAKING: Field `asset_type` of type `FailedAssetKind` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- BREAKING: Add `FunctionStatus` ([#263](https://github.com/Substra/orchestrator/pull/263))
- Add Function status event machine ([#263](https://github.com/Substra/orchestrator/pull/263))

### Changed

- Renamed `compute_task_key`by `asset_key` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277))

## [0.37.0](https://github.com/Substra/orchestrator/releases/tag/0.37.0) - 2023-10-18

### Added
Expand Down
2 changes: 1 addition & 1 deletion chaincode/failurereport/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SmartContract) GetFailureReport(ctx ledger.TransactionContext, wrapper
return nil, err
}

model, err := service.GetFailureReport(params.GetComputeTaskKey())
model, err := service.GetFailureReport(params.GetAssetKey())
if err != nil {
s.logger.Error().Err(err).Msg("failed to fetch failure report")
return nil, err
Expand Down
12 changes: 7 additions & 5 deletions chaincode/failurereport/contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func TestRegisterFailureReport(t *testing.T) {
mspid := "org"

newFailureReport := &asset.NewFailureReport{
ComputeTaskKey: "taskUUID",
LogsAddress: &asset.Addressable{},
AssetKey: "taskUUID",
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
LogsAddress: &asset.Addressable{},
}
wrapper, err := communication.Wrap(context.Background(), newFailureReport)
assert.NoError(t, err)
Expand All @@ -56,15 +57,16 @@ func TestGetFailureReport(t *testing.T) {
contract := &SmartContract{}

param := &asset.GetFailureReportParam{
ComputeTaskKey: "uuid",
AssetKey: "uuid",
}
wrapper, err := communication.Wrap(context.Background(), param)
assert.NoError(t, err)

ctx := new(ledger.MockTransactionContext)

failureReport := &asset.FailureReport{
ComputeTaskKey: param.ComputeTaskKey,
AssetKey: param.AssetKey,
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
}
mockService := getMockedService(ctx)
mockService.On("GetFailureReport", "uuid").Return(failureReport, nil).Once()
Expand All @@ -75,7 +77,7 @@ func TestGetFailureReport(t *testing.T) {
resp := new(asset.FailureReport)
err = wrapped.Unwrap(resp)
assert.NoError(t, err)
assert.Equal(t, resp.ComputeTaskKey, param.ComputeTaskKey)
assert.Equal(t, resp.AssetKey, param.AssetKey)

mockService.AssertExpectations(t)
}
29 changes: 29 additions & 0 deletions chaincode/function/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,35 @@ func (s *SmartContract) UpdateFunction(ctx ledger.TransactionContext, wrapper *c
return nil
}

func (s *SmartContract) ApplyFunctionAction(ctx ledger.TransactionContext, wrapper *communication.Wrapper) error {
provider, err := ctx.GetProvider()
if err != nil {
return err
}
service := provider.GetFunctionService()

params := new(asset.ApplyFunctionActionParam)
err = wrapper.Unwrap(params)
if err != nil {
s.logger.Error().Err(err).Msg("failed to unwrap param")
return err
}

requester, err := ledger.GetTxCreator(ctx.GetStub())
if err != nil {
s.logger.Error().Err(err).Msg("failed to extract tx creator")
return err
}

err = service.ApplyFunctionAction(params.FunctionKey, params.Action, "", requester)
if err != nil {
s.logger.Error().Err(err).Msg("failed to update function")
return err
}

return nil
}

// GetEvaluateTransactions returns functions of SmartContract not to be tagged as submit
func (s *SmartContract) GetEvaluateTransactions() []string {
return commonserv.ReadOnlyMethods["Function"]
Expand Down
1 change: 1 addition & 0 deletions chaincode/ledger/dbal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type storedAsset struct {
const computePlanTaskStatusIndex = "computePlan~computePlanKey~status~task"
const computeTaskParentIndex = "computeTask~parentTask~key"
const computeTaskChildIndex = "computeTask~childTask~key"
const computeTaskFunctionStatusIndex = "computeTask~functionKey~status~key"
const modelTaskKeyIndex = "model~taskKey~modelKey"
const performanceIndex = "performance~taskKey~metricKey"
const allOrganizationsIndex = "organizations~id"
Expand Down
36 changes: 36 additions & 0 deletions chaincode/ledger/dbal_computetask.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (db *DB) addComputeTask(t *asset.ComputeTask) error {
if err != nil {
return err
}

err = db.createIndex(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, t.FunctionKey, t.Status.String(), t.Key})
if err != nil {
return err
}
for _, parentTask := range service.GetParentTaskKeys(t.Inputs) {
err = db.createIndex(computeTaskParentIndex, []string{asset.ComputeTaskKind, parentTask, t.Key})
if err != nil {
Expand Down Expand Up @@ -95,6 +100,14 @@ func (db *DB) UpdateComputeTaskStatus(taskKey string, taskStatus asset.ComputeTa
if err != nil {
return err
}
err = db.updateIndex(
computeTaskFunctionStatusIndex,
[]string{asset.ComputeTaskKind, prevTask.FunctionKey, prevTask.Status.String(), prevTask.Key},
[]string{asset.ComputeTaskKind, updatedTask.FunctionKey, updatedTask.Status.String(), updatedTask.Key},
)
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -410,3 +423,26 @@ func (db *DB) GetComputeTaskOutputAssets(taskKey, identifier string) ([]*asset.C

return outputAssets, nil
}

func (db *DB) GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error) {
keysTodo, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskStatus_STATUS_TODO.String(), key})
if err != nil {
return nil, err
}

keysDoing, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskAction_TASK_ACTION_DOING.String(), key})
if err != nil {
return nil, err
}

tasks := []*asset.ComputeTask{}
for _, taskKey := range append(keysTodo, keysDoing...) {
task, err := db.GetComputeTask(taskKey)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}

return tasks, nil
}
10 changes: 5 additions & 5 deletions chaincode/ledger/dbal_failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) {
func (db *DB) GetFailureReport(assetKey string) (*asset.FailureReport, error) {
failureReport := new(asset.FailureReport)

b, err := db.getState(asset.FailureReportKind, computeTaskKey)
b, err := db.getState(asset.FailureReportKind, assetKey)
if err != nil {
return nil, err
}
Expand All @@ -22,17 +22,17 @@ func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, err
}

func (db *DB) AddFailureReport(failureReport *asset.FailureReport) error {
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetComputeTaskKey())
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetAssetKey())
if err != nil {
return err
}
if exists {
return errors.NewConflict(asset.FailureReportKind, failureReport.GetComputeTaskKey())
return errors.NewConflict(asset.FailureReportKind, failureReport.GetAssetKey())
}
bytes, err := marshaller.Marshal(failureReport)
if err != nil {
return err
}

return db.putState(asset.FailureReportKind, failureReport.GetComputeTaskKey(), bytes)
return db.putState(asset.FailureReportKind, failureReport.GetAssetKey(), bytes)
}
28 changes: 28 additions & 0 deletions docs/assets/function.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,31 @@ Function outputs must verify the following constraints:

- An output `kind` must be one of the following: `MODEL`, `PERFORMANCE`
- An output of kind `PERFORMANCE` cannot be `Multiple`


## Status

A function can have several status (see _States_ below for available transitions):

- WAITING: A function has been registered and is waiting to build.
- BUILDING: function is being build by the function owner.
- READY: function is ready to be used for compute task.
- FAILED: function build has failed.
- CANCELED: function has been cancelled.

## State

A compute task will go through different state during a compute plan execution.
This is an overview of a task's lifecycle:

![](./schemas/function.state.svg)

A task can be created in TODO or WAITING state depending on its parents.

During the ComputePlan execution, as tasks are DONE, their statuses will be reflected to their children.
If all the parents of a child task are DONE, this task enters TODO state.

When a parent task fails, children statuses are not changed.

A task may produce one or more [models](./model.md), they can only be registered when the task is in DOING.
This is to ensure that when a task starts (switch to DOING), all its inputs are available.
9 changes: 9 additions & 0 deletions docs/assets/schemas/function.state.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
stateDiagram-v2
WAITING --> BUILDING
WAITING --> CANCELED
WAITING --> FAILED
BUILDING --> FAILED
BUILDING --> READY
READY --> [*]
FAILED --> [*]
CANCELED --> [*]
Loading
Loading