Skip to content

Commit

Permalink
feat!: add function events (#310)
Browse files Browse the repository at this point in the history
## Details 
- Substra/substra-backend#714

Add function events, used now we decoupled the building of the function
with the execution of the compute task. For that it add a status field
on the `Function`. It also includes another PR (merged here), to have
functions build logs working again.

Fixes FL-1160

As this is going to be merged on a branch that is going to be merged to
a POC branch, we use MNIST as a baseline of a working model. We will
deal with failing tests on the POC before merging on main.

## Companion PR

* orchestrator: #310
* backend: Substra/substra-backend#756
* frontend: Substra/substra-frontend#240
* substra-generator: owkin/substra-generator#131

## Misc
- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

## Description

<!-- Please reference issue if any. -->

<!-- Please include a summary of your changes. -->

## How has this been tested?

<!-- Please describe the tests that you ran to verify your changes.  -->

## Checklist

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Guilhem Barthes <guilhem.barthes@owkin.com>
Signed-off-by: Guilhem Barthés <guilhem.barthes@owkin.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Signed-off-by: thbcmlowk <thbcmlowk@users.noreply.github.com>
Co-authored-by: Guilhem Barthés <guilhem.barthes@owkin.com>
Co-authored-by: guilhem-barthes <guilhem-barthes@users.noreply.github.com>
Co-authored-by: thbcmlowk <thbcmlowk@users.noreply.github.com>
  • Loading branch information
4 people committed Oct 25, 2023
1 parent 5dc63d4 commit 8568574
Show file tree
Hide file tree
Showing 43 changed files with 1,920 additions and 903 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- BREAKING: `distributed` Skaffold profile and mentions in doc ([#319](https://github.com/Substra/orchestrator/pull/319))
- BREAKING: `chaincode-init` and `chaincode` Dockerfiles ([#319](https://github.com/Substra/orchestrator/pull/319))

### Added

- Enum `FailedAssetKind` ([#277](https://github.com/Substra/orchestrator/pull/277))
- BREAKING: Field `asset_type` of type `FailedAssetKind` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- BREAKING: Add `FunctionStatus` ([#263](https://github.com/Substra/orchestrator/pull/263))
- Add Function status event machine ([#263](https://github.com/Substra/orchestrator/pull/263))

### Changed

- Renamed `compute_task_key`by `asset_key` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277))

## [0.37.0](https://github.com/Substra/orchestrator/releases/tag/0.37.0) - 2023-10-18

### Added
Expand Down
2 changes: 1 addition & 1 deletion chaincode/failurereport/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SmartContract) GetFailureReport(ctx ledger.TransactionContext, wrapper
return nil, err
}

model, err := service.GetFailureReport(params.GetComputeTaskKey())
model, err := service.GetFailureReport(params.GetAssetKey())
if err != nil {
s.logger.Error().Err(err).Msg("failed to fetch failure report")
return nil, err
Expand Down
12 changes: 7 additions & 5 deletions chaincode/failurereport/contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func TestRegisterFailureReport(t *testing.T) {
mspid := "org"

newFailureReport := &asset.NewFailureReport{
ComputeTaskKey: "taskUUID",
LogsAddress: &asset.Addressable{},
AssetKey: "taskUUID",
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
LogsAddress: &asset.Addressable{},
}
wrapper, err := communication.Wrap(context.Background(), newFailureReport)
assert.NoError(t, err)
Expand All @@ -56,15 +57,16 @@ func TestGetFailureReport(t *testing.T) {
contract := &SmartContract{}

param := &asset.GetFailureReportParam{
ComputeTaskKey: "uuid",
AssetKey: "uuid",
}
wrapper, err := communication.Wrap(context.Background(), param)
assert.NoError(t, err)

ctx := new(ledger.MockTransactionContext)

failureReport := &asset.FailureReport{
ComputeTaskKey: param.ComputeTaskKey,
AssetKey: param.AssetKey,
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
}
mockService := getMockedService(ctx)
mockService.On("GetFailureReport", "uuid").Return(failureReport, nil).Once()
Expand All @@ -75,7 +77,7 @@ func TestGetFailureReport(t *testing.T) {
resp := new(asset.FailureReport)
err = wrapped.Unwrap(resp)
assert.NoError(t, err)
assert.Equal(t, resp.ComputeTaskKey, param.ComputeTaskKey)
assert.Equal(t, resp.AssetKey, param.AssetKey)

mockService.AssertExpectations(t)
}
29 changes: 29 additions & 0 deletions chaincode/function/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,35 @@ func (s *SmartContract) UpdateFunction(ctx ledger.TransactionContext, wrapper *c
return nil
}

func (s *SmartContract) ApplyFunctionAction(ctx ledger.TransactionContext, wrapper *communication.Wrapper) error {
provider, err := ctx.GetProvider()
if err != nil {
return err
}
service := provider.GetFunctionService()

params := new(asset.ApplyFunctionActionParam)
err = wrapper.Unwrap(params)
if err != nil {
s.logger.Error().Err(err).Msg("failed to unwrap param")
return err
}

requester, err := ledger.GetTxCreator(ctx.GetStub())
if err != nil {
s.logger.Error().Err(err).Msg("failed to extract tx creator")
return err
}

err = service.ApplyFunctionAction(params.FunctionKey, params.Action, "", requester)
if err != nil {
s.logger.Error().Err(err).Msg("failed to update function")
return err
}

return nil
}

// GetEvaluateTransactions returns functions of SmartContract not to be tagged as submit
func (s *SmartContract) GetEvaluateTransactions() []string {
return commonserv.ReadOnlyMethods["Function"]
Expand Down
1 change: 1 addition & 0 deletions chaincode/ledger/dbal.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type storedAsset struct {
const computePlanTaskStatusIndex = "computePlan~computePlanKey~status~task"
const computeTaskParentIndex = "computeTask~parentTask~key"
const computeTaskChildIndex = "computeTask~childTask~key"
const computeTaskFunctionStatusIndex = "computeTask~functionKey~status~key"
const modelTaskKeyIndex = "model~taskKey~modelKey"
const performanceIndex = "performance~taskKey~metricKey"
const allOrganizationsIndex = "organizations~id"
Expand Down
36 changes: 36 additions & 0 deletions chaincode/ledger/dbal_computetask.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (db *DB) addComputeTask(t *asset.ComputeTask) error {
if err != nil {
return err
}

err = db.createIndex(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, t.FunctionKey, t.Status.String(), t.Key})
if err != nil {
return err
}
for _, parentTask := range service.GetParentTaskKeys(t.Inputs) {
err = db.createIndex(computeTaskParentIndex, []string{asset.ComputeTaskKind, parentTask, t.Key})
if err != nil {
Expand Down Expand Up @@ -95,6 +100,14 @@ func (db *DB) UpdateComputeTaskStatus(taskKey string, taskStatus asset.ComputeTa
if err != nil {
return err
}
err = db.updateIndex(
computeTaskFunctionStatusIndex,
[]string{asset.ComputeTaskKind, prevTask.FunctionKey, prevTask.Status.String(), prevTask.Key},
[]string{asset.ComputeTaskKind, updatedTask.FunctionKey, updatedTask.Status.String(), updatedTask.Key},
)
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -410,3 +423,26 @@ func (db *DB) GetComputeTaskOutputAssets(taskKey, identifier string) ([]*asset.C

return outputAssets, nil
}

func (db *DB) GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error) {
keysTodo, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskStatus_STATUS_TODO.String(), key})
if err != nil {
return nil, err
}

keysDoing, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskAction_TASK_ACTION_DOING.String(), key})
if err != nil {
return nil, err
}

tasks := []*asset.ComputeTask{}
for _, taskKey := range append(keysTodo, keysDoing...) {
task, err := db.GetComputeTask(taskKey)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}

return tasks, nil
}
10 changes: 5 additions & 5 deletions chaincode/ledger/dbal_failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) {
func (db *DB) GetFailureReport(assetKey string) (*asset.FailureReport, error) {
failureReport := new(asset.FailureReport)

b, err := db.getState(asset.FailureReportKind, computeTaskKey)
b, err := db.getState(asset.FailureReportKind, assetKey)
if err != nil {
return nil, err
}
Expand All @@ -22,17 +22,17 @@ func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, err
}

func (db *DB) AddFailureReport(failureReport *asset.FailureReport) error {
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetComputeTaskKey())
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetAssetKey())
if err != nil {
return err
}
if exists {
return errors.NewConflict(asset.FailureReportKind, failureReport.GetComputeTaskKey())
return errors.NewConflict(asset.FailureReportKind, failureReport.GetAssetKey())
}
bytes, err := marshaller.Marshal(failureReport)
if err != nil {
return err
}

return db.putState(asset.FailureReportKind, failureReport.GetComputeTaskKey(), bytes)
return db.putState(asset.FailureReportKind, failureReport.GetAssetKey(), bytes)
}
28 changes: 28 additions & 0 deletions docs/assets/function.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,31 @@ Function outputs must verify the following constraints:

- An output `kind` must be one of the following: `MODEL`, `PERFORMANCE`
- An output of kind `PERFORMANCE` cannot be `Multiple`


## Status

A function can have several status (see _States_ below for available transitions):

- WAITING: A function has been registered and is waiting to build.
- BUILDING: function is being build by the function owner.
- READY: function is ready to be used for compute task.
- FAILED: function build has failed.
- CANCELED: function has been cancelled.

## State

A compute task will go through different state during a compute plan execution.
This is an overview of a task's lifecycle:

![](./schemas/function.state.svg)

A task can be created in TODO or WAITING state depending on its parents.

During the ComputePlan execution, as tasks are DONE, their statuses will be reflected to their children.
If all the parents of a child task are DONE, this task enters TODO state.

When a parent task fails, children statuses are not changed.

A task may produce one or more [models](./model.md), they can only be registered when the task is in DOING.
This is to ensure that when a task starts (switch to DOING), all its inputs are available.
9 changes: 9 additions & 0 deletions docs/assets/schemas/function.state.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
stateDiagram-v2
WAITING --> BUILDING
WAITING --> CANCELED
WAITING --> FAILED
BUILDING --> FAILED
BUILDING --> READY
READY --> [*]
FAILED --> [*]
CANCELED --> [*]
Loading

0 comments on commit 8568574

Please sign in to comment.