From 843bb4f513fc32e9ef0c0971710833a96dcab500 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Barth=C3=A9s?= Date: Tue, 19 Sep 2023 09:50:57 +0200 Subject: [PATCH 1/4] feat: add function event (#263) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - https://github.com/Substra/substra-backend/pull/714 Add function events, used now we decoupled the building of the function with the execution of the compute task. For that it add a status field on the `Function`. It also includes another PR (merged here), to have functions build logs working again. Fixes FL-1160 As this is going to be merged on a branch that is going to be merged to a POC branch, we use MNIST as a baseline of a working model. We will deal with failing tests on the POC before merging on main. - [x] [changelog](../CHANGELOG.md) was updated with notable changes - [ ] documentation was updated --------- Signed-off-by: Guilhem Barthes Signed-off-by: Guilhem Barthés Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: guilhem-barthes --- CHANGELOG.md | 12 ++ chaincode/failurereport/contract.go | 2 +- chaincode/failurereport/contract_test.go | 12 +- chaincode/function/contract.go | 29 +++ chaincode/ledger/dbal.go | 1 + chaincode/ledger/dbal_computetask.go | 36 ++++ chaincode/ledger/dbal_failure.go | 10 +- docs/assets/function.md | 28 +++ docs/assets/schemas/function.state.mmd | 9 + docs/schemas/standalone-database.svg | 178 ++++++++--------- e2e/client/client.go | 41 +++- e2e/failure_report_test.go | 6 +- lib/asset/event_test.go | 2 +- lib/asset/failure_report.proto | 22 ++- lib/asset/failure_report_validation.go | 3 +- lib/asset/failure_report_validation_test.go | 57 +++--- lib/asset/function.proto | 27 +++ lib/asset/function_validation.go | 12 ++ lib/asset/function_validation_test.go | 38 ++++ lib/asset/sql.go | 46 +++++ lib/asset/sql_test.go | 14 ++ lib/persistence/computetask_dbal.go | 1 + lib/persistence/failure_report_dbal.go | 2 +- lib/service/computetask.go | 1 + lib/service/computetaskstate.go | 23 +++ lib/service/computetaskstate_test.go | 30 +++ lib/service/failure_report.go | 91 ++++++--- lib/service/failure_report_test.go | 103 ++++++++-- lib/service/function.go | 4 + lib/service/function_test.go | 1 + lib/service/functionstate.go | 180 ++++++++++++++++++ lib/service/functionstate_test.go | 146 ++++++++++++++ server/distributed/adapters/failure_report.go | 2 +- server/distributed/adapters/function.go | 15 ++ server/standalone/dbal/computetask.go | 41 ++++ server/standalone/dbal/failure_report.go | 36 ++-- server/standalone/dbal/failure_report_test.go | 6 +- server/standalone/dbal/function.go | 17 +- server/standalone/dbal/function_test.go | 16 +- server/standalone/handlers/failure_report.go | 2 +- server/standalone/handlers/function.go | 18 ++ .../000058_add-function-status.up.sql | 40 ++++ .../000059_modify-failure-report.up.sql | 37 ++++ 43 files changed, 1181 insertions(+), 216 deletions(-) create mode 100644 docs/assets/schemas/function.state.mmd create mode 100644 lib/service/functionstate.go create mode 100644 lib/service/functionstate_test.go create mode 100644 server/standalone/migration/000058_add-function-status.up.sql create mode 100644 server/standalone/migration/000059_modify-failure-report.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index fbfe6fc7..bb91f8ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - BREAKING: `distributed` Skaffold profile and mentions in doc ([#319](https://github.com/Substra/orchestrator/pull/319)) - BREAKING: `chaincode-init` and `chaincode` Dockerfiles ([#319](https://github.com/Substra/orchestrator/pull/319)) +### Added + +- Enum `FailedAssetKind` ([#277](https://github.com/Substra/orchestrator/pull/277)) +- BREAKING: Field `asset_type` of type `FailedAssetKind` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277)) +- BREAKING: Add `FunctionStatus` ([#263](https://github.com/Substra/orchestrator/pull/263)) +- Add Function status event machine ([#263](https://github.com/Substra/orchestrator/pull/263)) + +### Changed + +- Renamed `compute_task_key`by `asset_key` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277)) +- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277)) + ## [0.37.0](https://github.com/Substra/orchestrator/releases/tag/0.37.0) - 2023-10-18 ### Added diff --git a/chaincode/failurereport/contract.go b/chaincode/failurereport/contract.go index 8772717c..eda09baa 100644 --- a/chaincode/failurereport/contract.go +++ b/chaincode/failurereport/contract.go @@ -77,7 +77,7 @@ func (s *SmartContract) GetFailureReport(ctx ledger.TransactionContext, wrapper return nil, err } - model, err := service.GetFailureReport(params.GetComputeTaskKey()) + model, err := service.GetFailureReport(params.GetAssetKey()) if err != nil { s.logger.Error().Err(err).Msg("failed to fetch failure report") return nil, err diff --git a/chaincode/failurereport/contract_test.go b/chaincode/failurereport/contract_test.go index 0f7c6a77..fe6583eb 100644 --- a/chaincode/failurereport/contract_test.go +++ b/chaincode/failurereport/contract_test.go @@ -30,8 +30,9 @@ func TestRegisterFailureReport(t *testing.T) { mspid := "org" newFailureReport := &asset.NewFailureReport{ - ComputeTaskKey: "taskUUID", - LogsAddress: &asset.Addressable{}, + AssetKey: "taskUUID", + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + LogsAddress: &asset.Addressable{}, } wrapper, err := communication.Wrap(context.Background(), newFailureReport) assert.NoError(t, err) @@ -56,7 +57,7 @@ func TestGetFailureReport(t *testing.T) { contract := &SmartContract{} param := &asset.GetFailureReportParam{ - ComputeTaskKey: "uuid", + AssetKey: "uuid", } wrapper, err := communication.Wrap(context.Background(), param) assert.NoError(t, err) @@ -64,7 +65,8 @@ func TestGetFailureReport(t *testing.T) { ctx := new(ledger.MockTransactionContext) failureReport := &asset.FailureReport{ - ComputeTaskKey: param.ComputeTaskKey, + AssetKey: param.AssetKey, + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, } mockService := getMockedService(ctx) mockService.On("GetFailureReport", "uuid").Return(failureReport, nil).Once() @@ -75,7 +77,7 @@ func TestGetFailureReport(t *testing.T) { resp := new(asset.FailureReport) err = wrapped.Unwrap(resp) assert.NoError(t, err) - assert.Equal(t, resp.ComputeTaskKey, param.ComputeTaskKey) + assert.Equal(t, resp.AssetKey, param.AssetKey) mockService.AssertExpectations(t) } diff --git a/chaincode/function/contract.go b/chaincode/function/contract.go index 5a7db3ab..6486d696 100644 --- a/chaincode/function/contract.go +++ b/chaincode/function/contract.go @@ -159,6 +159,35 @@ func (s *SmartContract) UpdateFunction(ctx ledger.TransactionContext, wrapper *c return nil } +func (s *SmartContract) ApplyFunctionAction(ctx ledger.TransactionContext, wrapper *communication.Wrapper) error { + provider, err := ctx.GetProvider() + if err != nil { + return err + } + service := provider.GetFunctionService() + + params := new(asset.ApplyFunctionActionParam) + err = wrapper.Unwrap(params) + if err != nil { + s.logger.Error().Err(err).Msg("failed to unwrap param") + return err + } + + requester, err := ledger.GetTxCreator(ctx.GetStub()) + if err != nil { + s.logger.Error().Err(err).Msg("failed to extract tx creator") + return err + } + + err = service.ApplyFunctionAction(params.FunctionKey, params.Action, "", requester) + if err != nil { + s.logger.Error().Err(err).Msg("failed to update function") + return err + } + + return nil +} + // GetEvaluateTransactions returns functions of SmartContract not to be tagged as submit func (s *SmartContract) GetEvaluateTransactions() []string { return commonserv.ReadOnlyMethods["Function"] diff --git a/chaincode/ledger/dbal.go b/chaincode/ledger/dbal.go index 85ab350c..05160f2f 100644 --- a/chaincode/ledger/dbal.go +++ b/chaincode/ledger/dbal.go @@ -65,6 +65,7 @@ type storedAsset struct { const computePlanTaskStatusIndex = "computePlan~computePlanKey~status~task" const computeTaskParentIndex = "computeTask~parentTask~key" const computeTaskChildIndex = "computeTask~childTask~key" +const computeTaskFunctionStatusIndex = "computeTask~functionKey~status~key" const modelTaskKeyIndex = "model~taskKey~modelKey" const performanceIndex = "performance~taskKey~metricKey" const allOrganizationsIndex = "organizations~id" diff --git a/chaincode/ledger/dbal_computetask.go b/chaincode/ledger/dbal_computetask.go index 333d23db..99020cae 100644 --- a/chaincode/ledger/dbal_computetask.go +++ b/chaincode/ledger/dbal_computetask.go @@ -39,6 +39,11 @@ func (db *DB) addComputeTask(t *asset.ComputeTask) error { if err != nil { return err } + + err = db.createIndex(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, t.FunctionKey, t.Status.String(), t.Key}) + if err != nil { + return err + } for _, parentTask := range service.GetParentTaskKeys(t.Inputs) { err = db.createIndex(computeTaskParentIndex, []string{asset.ComputeTaskKind, parentTask, t.Key}) if err != nil { @@ -95,6 +100,14 @@ func (db *DB) UpdateComputeTaskStatus(taskKey string, taskStatus asset.ComputeTa if err != nil { return err } + err = db.updateIndex( + computeTaskFunctionStatusIndex, + []string{asset.ComputeTaskKind, prevTask.FunctionKey, prevTask.Status.String(), prevTask.Key}, + []string{asset.ComputeTaskKind, updatedTask.FunctionKey, updatedTask.Status.String(), updatedTask.Key}, + ) + if err != nil { + return err + } } return nil } @@ -410,3 +423,26 @@ func (db *DB) GetComputeTaskOutputAssets(taskKey, identifier string) ([]*asset.C return outputAssets, nil } + +func (db *DB) GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error) { + keysTodo, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskStatus_STATUS_TODO.String(), key}) + if err != nil { + return nil, err + } + + keysDoing, err := db.getIndexKeys(computeTaskFunctionStatusIndex, []string{asset.ComputeTaskKind, asset.ComputeTaskAction_TASK_ACTION_DOING.String(), key}) + if err != nil { + return nil, err + } + + tasks := []*asset.ComputeTask{} + for _, taskKey := range append(keysTodo, keysDoing...) { + task, err := db.GetComputeTask(taskKey) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + + return tasks, nil +} diff --git a/chaincode/ledger/dbal_failure.go b/chaincode/ledger/dbal_failure.go index 60a61188..b902ba50 100644 --- a/chaincode/ledger/dbal_failure.go +++ b/chaincode/ledger/dbal_failure.go @@ -6,10 +6,10 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) { +func (db *DB) GetFailureReport(assetKey string) (*asset.FailureReport, error) { failureReport := new(asset.FailureReport) - b, err := db.getState(asset.FailureReportKind, computeTaskKey) + b, err := db.getState(asset.FailureReportKind, assetKey) if err != nil { return nil, err } @@ -22,17 +22,17 @@ func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, err } func (db *DB) AddFailureReport(failureReport *asset.FailureReport) error { - exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetComputeTaskKey()) + exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetAssetKey()) if err != nil { return err } if exists { - return errors.NewConflict(asset.FailureReportKind, failureReport.GetComputeTaskKey()) + return errors.NewConflict(asset.FailureReportKind, failureReport.GetAssetKey()) } bytes, err := marshaller.Marshal(failureReport) if err != nil { return err } - return db.putState(asset.FailureReportKind, failureReport.GetComputeTaskKey(), bytes) + return db.putState(asset.FailureReportKind, failureReport.GetAssetKey(), bytes) } diff --git a/docs/assets/function.md b/docs/assets/function.md index 64bc2329..dd577544 100644 --- a/docs/assets/function.md +++ b/docs/assets/function.md @@ -37,3 +37,31 @@ Function outputs must verify the following constraints: - An output `kind` must be one of the following: `MODEL`, `PERFORMANCE` - An output of kind `PERFORMANCE` cannot be `Multiple` + + +## Status + +A function can have several status (see _States_ below for available transitions): + +- WAITING: A function has been registered and is waiting to build. +- BUILDING: function is being build by the function owner. +- READY: function is ready to be used for compute task. +- FAILED: function build has failed. +- CANCELED: function has been cancelled. + +## State + +A compute task will go through different state during a compute plan execution. +This is an overview of a task's lifecycle: + +![](./schemas/function.state.svg) + +A task can be created in TODO or WAITING state depending on its parents. + +During the ComputePlan execution, as tasks are DONE, their statuses will be reflected to their children. +If all the parents of a child task are DONE, this task enters TODO state. + +When a parent task fails, children statuses are not changed. + +A task may produce one or more [models](./model.md), they can only be registered when the task is in DOING. +This is to ensure that when a task starts (switch to DOING), all its inputs are available. \ No newline at end of file diff --git a/docs/assets/schemas/function.state.mmd b/docs/assets/schemas/function.state.mmd new file mode 100644 index 00000000..c3640061 --- /dev/null +++ b/docs/assets/schemas/function.state.mmd @@ -0,0 +1,9 @@ +stateDiagram-v2 + WAITING --> BUILDING + WAITING --> CANCELED + WAITING --> FAILED + BUILDING --> FAILED + BUILDING --> READY + READY --> [*] + FAILED --> [*] + CANCELED --> [*] \ No newline at end of file diff --git a/docs/schemas/standalone-database.svg b/docs/schemas/standalone-database.svg index e80eefa4..d512bca7 100644 --- a/docs/schemas/standalone-database.svg +++ b/docs/schemas/standalone-database.svg @@ -79,45 +79,48 @@ public.functions - - -public.functions -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -name -[varchar(100)] - -description -[varchar(200)] - -functionaddress -[varchar(200)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] + + +public.functions +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +name +[varchar(100)] + +description +[varchar(200)] + +functionaddress +[varchar(200)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +status +[function_status] public.functions:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) @@ -137,16 +140,16 @@ public.functions:functionaddress->public.addressables:storage_address - - -FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) + + +FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) public.functions:description->public.addressables:storage_address - - -FOREIGN KEY (description) REFERENCES addressables(storage_address) + + +FOREIGN KEY (description) REFERENCES addressables(storage_address) @@ -195,14 +198,14 @@ public.datamanagers:owner->public.organizations:id -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.datamanagers:description->public.addressables:storage_address -FOREIGN KEY (description) REFERENCES addressables(storage_address) +FOREIGN KEY (description) REFERENCES addressables(storage_address) @@ -270,7 +273,7 @@ public.compute_tasks:function_key->public.functions:key - + FOREIGN KEY (function_key) REFERENCES functions(key) @@ -753,8 +756,8 @@ public.function_inputs:function_key->public.functions:key - - + + FOREIGN KEY (function_key) REFERENCES functions(key) @@ -788,8 +791,8 @@ public.function_outputs:function_key->public.functions:key - - + + FOREIGN KEY (function_key) REFERENCES functions(key) @@ -948,44 +951,47 @@ public.expanded_functions - - -public.expanded_functions -     -[VIEW] - -key -[uuid] - -name -[varchar(100)] - -description_address -[varchar(200)] - -description_checksum -[varchar(64)] - -function_address -[varchar(200)] - -function_checksum -[varchar(64)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] - -channel -[varchar(100)] + + +public.expanded_functions +     +[VIEW] + +key +[uuid] + +name +[varchar(100)] + +description_address +[varchar(200)] + +description_checksum +[varchar(64)] + +function_address +[varchar(200)] + +function_checksum +[varchar(64)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +channel +[varchar(100)] + +status +[function_status] diff --git a/e2e/client/client.go b/e2e/client/client.go index 969f3266..77ddfbc0 100644 --- a/e2e/client/client.go +++ b/e2e/client/client.go @@ -553,6 +553,34 @@ func (c *TestClient) UpdateFunction(functionRef string, name string) *asset.Upda return resp } +func (c *TestClient) BuildFunction(keyRef string) { + c.applyFunctionAction(keyRef, asset.FunctionAction_FUNCTION_ACTION_BUILDING) +} + +func (c *TestClient) CancelFunction(keyRef string) { + c.applyFunctionAction(keyRef, asset.FunctionAction_FUNCTION_ACTION_CANCELED) +} + +func (c *TestClient) FailFunction(keyRef string) { + c.applyFunctionAction(keyRef, asset.FunctionAction_FUNCTION_ACTION_FAILED) +} + +func (c *TestClient) SetReadyFunction(keyRef string) { + c.applyFunctionAction(keyRef, asset.FunctionAction_FUNCTION_ACTION_READY) +} + +func (c *TestClient) applyFunctionAction(keyRef string, action asset.FunctionAction) { + functionKey := c.ks.GetKey(keyRef) + c.logger.Debug().Str("functionKey", functionKey).Str("action", action.String()).Msg("applying function action") + _, err := c.functionService.ApplyFunctionAction(c.ctx, &asset.ApplyFunctionActionParam{ + FunctionKey: functionKey, + Action: action, + }) + if err != nil { + c.logger.Fatal().Err(err).Msgf("failed to mark function as %v", action) + } +} + func (c *TestClient) QueryEvents(filter *asset.EventQueryFilter, pageToken string, pageSize int) *asset.QueryEventsResponse { resp, err := c.eventService.QueryEvents(c.ctx, &asset.QueryEventsParam{Filter: filter, PageToken: pageToken, PageSize: uint32(pageSize)}) if err != nil { @@ -581,10 +609,11 @@ func (c *TestClient) QueryPlans(filter *asset.PlanQueryFilter, pageToken string, return resp } -func (c *TestClient) RegisterFailureReport(taskRef string) *asset.FailureReport { +func (c *TestClient) RegisterTaskFailureReport(assetRef string) *asset.FailureReport { newFailureReport := &asset.NewFailureReport{ - ComputeTaskKey: c.ks.GetKey(taskRef), - ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, + AssetKey: c.ks.GetKey(assetRef), + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, LogsAddress: &asset.Addressable{ Checksum: "5e12e1a2687d81b268558217856547f8a4519f9688933351386a7f902cf1ce5d", StorageAddress: "http://somewhere.local/failure/" + uuid.NewString(), @@ -600,12 +629,12 @@ func (c *TestClient) RegisterFailureReport(taskRef string) *asset.FailureReport return failureReport } -func (c *TestClient) GetFailureReport(taskRef string) *asset.FailureReport { +func (c *TestClient) GetFailureReport(assetRef string) *asset.FailureReport { param := &asset.GetFailureReportParam{ - ComputeTaskKey: c.ks.GetKey(taskRef), + AssetKey: c.ks.GetKey(assetRef), } - c.logger.Debug().Str("task key", param.ComputeTaskKey).Msg("getting failure report") + c.logger.Debug().Str("asset key", param.AssetKey).Msg("getting failure report") failureReport, err := c.failureReportService.GetFailureReport(c.ctx, param) if err != nil { c.logger.Fatal().Err(err).Msg("GetFailureReport failed") diff --git a/e2e/failure_report_test.go b/e2e/failure_report_test.go index 3511c546..1717edda 100644 --- a/e2e/failure_report_test.go +++ b/e2e/failure_report_test.go @@ -25,17 +25,17 @@ func TestRegisterFailureReport(t *testing.T) { appClient.StartTask(client.DefaultTrainTaskRef) - registeredFailureReport := appClient.RegisterFailureReport(client.DefaultTrainTaskRef) + registeredFailureReport := appClient.RegisterTaskFailureReport(client.DefaultTrainTaskRef) task := appClient.GetComputeTask(client.DefaultTrainTaskRef) - require.Equal(t, task.Key, registeredFailureReport.ComputeTaskKey) + require.Equal(t, task.Key, registeredFailureReport.AssetKey) require.Equal(t, asset.ComputeTaskStatus_STATUS_FAILED, task.Status) retrievedFailureReport := appClient.GetFailureReport(client.DefaultTrainTaskRef) e2erequire.ProtoEqual(t, registeredFailureReport, retrievedFailureReport) eventResp := appClient.QueryEvents(&asset.EventQueryFilter{ - AssetKey: registeredFailureReport.ComputeTaskKey, + AssetKey: registeredFailureReport.AssetKey, AssetKind: asset.AssetKind_ASSET_FAILURE_REPORT, EventKind: asset.EventKind_EVENT_ASSET_CREATED, }, "", 100) diff --git a/lib/asset/event_test.go b/lib/asset/event_test.go index 7f1ac1f5..f8b0f978 100644 --- a/lib/asset/event_test.go +++ b/lib/asset/event_test.go @@ -31,7 +31,7 @@ func TestMarshalUnmarshalEventAsset(t *testing.T) { }, "failureReport": { AssetKind: AssetKind_ASSET_FAILURE_REPORT, - Asset: &Event_FailureReport{FailureReport: &FailureReport{ComputeTaskKey: "failed-task"}}, + Asset: &Event_FailureReport{FailureReport: &FailureReport{AssetKey: "failed-task"}}, }, "model": { AssetKind: AssetKind_ASSET_MODEL, diff --git a/lib/asset/failure_report.proto b/lib/asset/failure_report.proto index 231a0c31..dad011e6 100644 --- a/lib/asset/failure_report.proto +++ b/lib/asset/failure_report.proto @@ -24,30 +24,40 @@ enum ErrorType { ERROR_TYPE_INTERNAL = 3; } -// FailureReport is used to store information related to a failed ComputeTask. +enum FailedAssetKind { + FAILED_ASSET_UNKNOWN = 0; + FAILED_ASSET_COMPUTE_TASK = 1; + FAILED_ASSET_FUNCTION = 2; +} + +// FailureReport is used to store information related to a failed ComputeTask or Function builds. message FailureReport { - string compute_task_key = 1; + string asset_key = 1; ErrorType error_type = 2; Addressable logs_address = 3; google.protobuf.Timestamp creation_date = 4; - // The owner of a failure report matches the 'worker' field of the associated compute task but can differ from + // In the case of a compute task failure, the owner of a failure report matches the 'worker' field of the associated compute task but can differ from // the owner of the compute task. Indeed, a task belonging to some user can be executed on an organization belonging - // to another user. The failure report generated will be located on the execution organization and belong to the owner + // to another user. + // In the case of a function, the owner will be the owner of the function (which builds the function). + // The failure report generated will be located on the execution organization and belong to the owner // of this organization. string owner = 5; + FailedAssetKind asset_type = 6; } // NewFailureReport is used to register a FailureReport. // It will be processed into a FailureReport. message NewFailureReport { - string compute_task_key = 1; + string asset_key = 1; ErrorType error_type = 2; Addressable logs_address = 3; + FailedAssetKind asset_type = 4; } // GetFailureReportParam is used to fetch a Failure. message GetFailureReportParam { - string compute_task_key = 1; + string asset_key = 1; } service FailureReportService { diff --git a/lib/asset/failure_report_validation.go b/lib/asset/failure_report_validation.go index 01d6784a..8d3f2cb4 100644 --- a/lib/asset/failure_report_validation.go +++ b/lib/asset/failure_report_validation.go @@ -9,8 +9,9 @@ import ( // Validate returns an error if the new FailureReport object is not valid. func (f *NewFailureReport) Validate() error { return validation.ValidateStruct(f, - validation.Field(&f.ComputeTaskKey, validation.Required, is.UUID), + validation.Field(&f.AssetKey, validation.Required, is.UUID), validation.Field(&f.ErrorType, validation.In(ErrorType_ERROR_TYPE_BUILD, ErrorType_ERROR_TYPE_EXECUTION, ErrorType_ERROR_TYPE_INTERNAL)), + validation.Field(&f.AssetType, validation.In(FailedAssetKind_FAILED_ASSET_FUNCTION, FailedAssetKind_FAILED_ASSET_COMPUTE_TASK)), validation.Field(&f.LogsAddress, validation.When(utils.SliceContains([]ErrorType{ErrorType_ERROR_TYPE_EXECUTION, ErrorType_ERROR_TYPE_BUILD}, f.ErrorType), validation.Required).Else(validation.Nil)), ) } diff --git a/lib/asset/failure_report_validation_test.go b/lib/asset/failure_report_validation_test.go index 8928ba11..6cef9058 100644 --- a/lib/asset/failure_report_validation_test.go +++ b/lib/asset/failure_report_validation_test.go @@ -19,40 +19,53 @@ func TestFailureReportValidate(t *testing.T) { cases := map[string]failureReportTestCase{ "empty": {&NewFailureReport{}, false}, - "invalidComputeTaskKey": {&NewFailureReport{ - ComputeTaskKey: "notUUID", - ErrorType: ErrorType_ERROR_TYPE_BUILD, - LogsAddress: nil, + "invalidAssetKeyFunction": {&NewFailureReport{ + AssetKey: "notUUID", + AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION, + ErrorType: ErrorType_ERROR_TYPE_BUILD, + LogsAddress: nil, + }, false}, + "invalidAssetKeyComputeTask": {&NewFailureReport{ + AssetKey: "notUUID", + AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: ErrorType_ERROR_TYPE_BUILD, + LogsAddress: nil, }, false}, "validBuildError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_BUILD, - LogsAddress: validAddressable, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION, + ErrorType: ErrorType_ERROR_TYPE_BUILD, + LogsAddress: validAddressable, }, true}, "invalidBuildError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_BUILD, - LogsAddress: nil, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION, + ErrorType: ErrorType_ERROR_TYPE_BUILD, + LogsAddress: nil, }, false}, "validExecutionError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_EXECUTION, - LogsAddress: validAddressable, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: ErrorType_ERROR_TYPE_EXECUTION, + LogsAddress: validAddressable, }, true}, "invalidExecutionError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_EXECUTION, - LogsAddress: nil, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: ErrorType_ERROR_TYPE_EXECUTION, + LogsAddress: nil, }, false}, "validInternalError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_INTERNAL, - LogsAddress: nil, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: ErrorType_ERROR_TYPE_INTERNAL, + LogsAddress: nil, }, true}, "invalidInternalError": {&NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: ErrorType_ERROR_TYPE_INTERNAL, - LogsAddress: validAddressable, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: ErrorType_ERROR_TYPE_INTERNAL, + LogsAddress: validAddressable, }, false}, } diff --git a/lib/asset/function.proto b/lib/asset/function.proto index 550c0436..c42c8be0 100644 --- a/lib/asset/function.proto +++ b/lib/asset/function.proto @@ -18,6 +18,23 @@ message FunctionOutput { bool multiple = 2; } +enum FunctionAction { + FUNCTION_ACTION_UNKNOWN = 0; + FUNCTION_ACTION_BUILDING = 1; + FUNCTION_ACTION_CANCELED = 2; + FUNCTION_ACTION_FAILED = 3; + FUNCTION_ACTION_READY = 4; +} + +enum FunctionStatus { + FUNCTION_STATUS_UNKNOWN = 0; + FUNCTION_STATUS_WAITING = 1; + FUNCTION_STATUS_BUILDING = 2; + FUNCTION_STATUS_READY = 3; + FUNCTION_STATUS_CANCELED = 4; + FUNCTION_STATUS_FAILED = 5; +} + // Function represents the code which will be used // to produce or test a model. message Function { @@ -34,6 +51,7 @@ message Function { map metadata = 16; map inputs = 17; map outputs = 18; + FunctionStatus status = 19; } // NewFunction is used to register an Function. @@ -83,4 +101,13 @@ service FunctionService { rpc GetFunction(GetFunctionParam) returns (Function); rpc QueryFunctions(QueryFunctionsParam) returns (QueryFunctionsResponse); rpc UpdateFunction(UpdateFunctionParam) returns (UpdateFunctionResponse); + rpc ApplyFunctionAction(ApplyFunctionActionParam) returns (ApplyFunctionActionResponse); + +} + +message ApplyFunctionActionParam { + string function_key = 1; + FunctionAction action = 2; } + +message ApplyFunctionActionResponse {} \ No newline at end of file diff --git a/lib/asset/function_validation.go b/lib/asset/function_validation.go index ddb516a9..2c0584c6 100644 --- a/lib/asset/function_validation.go +++ b/lib/asset/function_validation.go @@ -102,3 +102,15 @@ func validateOutputs(input interface{}) error { return nil } + +func (p *ApplyFunctionActionParam) Validate() error { + return validation.ValidateStruct(p, + validation.Field(&p.FunctionKey, validation.Required, is.UUID), + validation.Field(&p.Action, validation.Required, validation.In( + FunctionAction_FUNCTION_ACTION_BUILDING, + FunctionAction_FUNCTION_ACTION_FAILED, + FunctionAction_FUNCTION_ACTION_CANCELED, + FunctionAction_FUNCTION_ACTION_READY, + )), + ) +} diff --git a/lib/asset/function_validation_test.go b/lib/asset/function_validation_test.go index 5e9165b1..e18a5f40 100644 --- a/lib/asset/function_validation_test.go +++ b/lib/asset/function_validation_test.go @@ -156,3 +156,41 @@ func TestUpdateFunctionValidate(t *testing.T) { } } } +func TestApplyFunctionActionParam(t *testing.T) { + empty := &ApplyFunctionActionParam{} + valid := &ApplyFunctionActionParam{ + FunctionKey: "972bef4c-1b42-4743-bbe9-cc3f4a69952f", + Action: FunctionAction_FUNCTION_ACTION_BUILDING, + } + missingKey := &ApplyFunctionActionParam{ + Action: FunctionAction_FUNCTION_ACTION_BUILDING, + } + missingAction := &ApplyFunctionActionParam{ + FunctionKey: "972bef4c-1b42-4743-bbe9-cc3f4a69952f", + } + invalidAction := &ApplyFunctionActionParam{ + FunctionKey: "972bef4c-1b42-4743-bbe9-cc3f4a69952f", + Action: FunctionAction_FUNCTION_ACTION_UNKNOWN, + } + + cases := map[string]struct { + valid bool + param *ApplyFunctionActionParam + }{ + "valid": {valid: true, param: valid}, + "empty": {valid: false, param: empty}, + "missing key": {valid: false, param: missingKey}, + "missing action": {valid: false, param: missingAction}, + "invalid action": {valid: false, param: invalidAction}, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + if c.valid { + assert.NoError(t, c.param.Validate()) + } else { + assert.Error(t, c.param.Validate()) + } + }) + } +} diff --git a/lib/asset/sql.go b/lib/asset/sql.go index 78d1f27b..17b46bce 100644 --- a/lib/asset/sql.go +++ b/lib/asset/sql.go @@ -132,3 +132,49 @@ func (k *EventKind) Scan(value interface{}) error { return nil } + +// Value implements the driver.Valuer interface. +// Simply returns the string representation of the FunctionStatus. +func (fs *FunctionStatus) Value() (driver.Value, error) { + return fs.String(), nil +} + +// Scan implements the sql.Scanner interface. +// Simply decodes a string into the FunctionStatus. +func (fs *FunctionStatus) Scan(value interface{}) error { + s, ok := value.(string) + if !ok { + return errors.NewInternal("cannot scan function status: invalid string") + } + + v, ok := FunctionStatus_value[s] + if !ok { + return errors.NewInternal("cannot scan function status: unknown value") + } + *fs = FunctionStatus(v) + + return nil +} + +// Value implements the driver.Valuer interface. +// Simply returns the string representation of the FunctionStatus. +func (fak *FailedAssetKind) Value() (driver.Value, error) { + return fak.String(), nil +} + +// Scan implements the sql.Scanner interface. +// Simply decodes a string into the FunctionStatus. +func (fak *FailedAssetKind) Scan(value interface{}) error { + s, ok := value.(string) + if !ok { + return errors.NewInternal("cannot scan failed asset kind: invalid string") + } + + v, ok := FailedAssetKind_value[s] + if !ok { + return errors.NewInternal("cannot scan failed asset kind: unknown value") + } + *fak = FailedAssetKind(v) + + return nil +} diff --git a/lib/asset/sql_test.go b/lib/asset/sql_test.go index d17119df..12b4dfed 100644 --- a/lib/asset/sql_test.go +++ b/lib/asset/sql_test.go @@ -90,3 +90,17 @@ func TestEventKindValue(t *testing.T) { assert.Equal(t, kind, scanned) } + +func TestFailedAssetKindKindValue(t *testing.T) { + k := FailedAssetKind_FAILED_ASSET_UNKNOWN + kind := &k + + value, err := kind.Value() + assert.NoError(t, err, "failed asset kind serialization should not fail") + + scanned := new(FailedAssetKind) + err = scanned.Scan(value) + assert.NoError(t, err, "failed asset kind scan should not fail") + + assert.Equal(t, kind, scanned) +} diff --git a/lib/persistence/computetask_dbal.go b/lib/persistence/computetask_dbal.go index 642a23ee..0abc3485 100644 --- a/lib/persistence/computetask_dbal.go +++ b/lib/persistence/computetask_dbal.go @@ -21,6 +21,7 @@ type ComputeTaskDBAL interface { // GetComputePlanTasks returns the tasks of the compute plan identified by the given key GetComputePlanTasks(key string) ([]*asset.ComputeTask, error) GetComputePlanTasksKeys(key string) ([]string, error) + GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error) AddComputeTaskOutputAsset(output *asset.ComputeTaskOutputAsset) error // CountComputeTaskRegisteredOutputs returns the number of registered outputs by identifier CountComputeTaskRegisteredOutputs(key string) (ComputeTaskOutputCounter, error) diff --git a/lib/persistence/failure_report_dbal.go b/lib/persistence/failure_report_dbal.go index 56006d2d..388439a5 100644 --- a/lib/persistence/failure_report_dbal.go +++ b/lib/persistence/failure_report_dbal.go @@ -5,7 +5,7 @@ import ( ) type FailureReportDBAL interface { - GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) + GetFailureReport(assetKey string) (*asset.FailureReport, error) AddFailureReport(f *asset.FailureReport) error } diff --git a/lib/service/computetask.go b/lib/service/computetask.go index 69fc5c46..12fb3a9e 100644 --- a/lib/service/computetask.go +++ b/lib/service/computetask.go @@ -37,6 +37,7 @@ type ComputeTaskAPI interface { applyTaskAction(task *asset.ComputeTask, action taskTransition, reason string) error addComputeTaskOutputAsset(output *asset.ComputeTaskOutputAsset) error getTaskOutputCounter(taskKey string) (persistence.ComputeTaskOutputCounter, error) + propagateFunctionCancelation(functionKey string, requester string) error } // ComputeTaskServiceProvider defines an object able to provide a ComputeTaskAPI instance diff --git a/lib/service/computetaskstate.go b/lib/service/computetaskstate.go index e637c189..e7121cea 100644 --- a/lib/service/computetaskstate.go +++ b/lib/service/computetaskstate.go @@ -339,3 +339,26 @@ func updateAllowed(task *asset.ComputeTask, action asset.ComputeTaskAction, requ return false } } + +func (s *ComputeTaskService) propagateFunctionCancelation(functionKey string, requester string) error { + tasks, err := s.GetComputeTaskDBAL().GetFunctionRunnableTasks(functionKey) + + if err != nil { + return err + } + + for _, task := range tasks { + // We bypass the `requester` check as we checked the requester on the function state machine. + err := s.ApplyTaskAction(task.Key, asset.ComputeTaskAction_TASK_ACTION_FAILED, "Function building failed", task.Worker) + if err != nil { + s.GetLogger().Error(). + Err(err). + Str("functionKey", functionKey). + Str("taskKey", task.Key). + Msg("failed to propagate task action when applying function action") + return err + } + } + + return nil +} diff --git a/lib/service/computetaskstate_test.go b/lib/service/computetaskstate_test.go index 3d6b11ba..ea9ec545 100644 --- a/lib/service/computetaskstate_test.go +++ b/lib/service/computetaskstate_test.go @@ -262,3 +262,33 @@ func TestUpdateAllowed(t *testing.T) { }) } } + +func TestPropagateFunctionCancelation(t *testing.T) { + dbal := new(persistence.MockDBAL) + es := new(MockEventAPI) + cps := new(MockComputePlanAPI) + provider := newMockedProvider() + service := NewComputeTaskService(provider) + + provider.On("GetComputeTaskDBAL").Return(dbal) + provider.On("GetEventService").Return(es) + provider.On("GetComputePlanService").Return(cps) + + functionKey := "uuid_f" + task := &asset.ComputeTask{Key: "uuid_t", Status: asset.ComputeTaskStatus_STATUS_TODO, Owner: "owner", Worker: "worker"} + + cps.On("failPlan", mock.Anything).Return(nil) + dbal.On("GetFunctionRunnableTasks", functionKey).Return([]*asset.ComputeTask{task}, nil) + dbal.On("GetComputeTask", task.Key).Return(task, nil) + dbal.On("UpdateComputeTaskStatus", task.Key, asset.ComputeTaskStatus_STATUS_FAILED).Return(nil) + es.On("RegisterEvents", mock.Anything).Return(nil) + + err := service.propagateFunctionCancelation(functionKey, "owner") + + assert.NoError(t, err) + + cps.AssertExpectations(t) + dbal.AssertExpectations(t) + es.AssertExpectations(t) + provider.AssertExpectations(t) +} diff --git a/lib/service/failure_report.go b/lib/service/failure_report.go index 057675d2..2b87fd91 100644 --- a/lib/service/failure_report.go +++ b/lib/service/failure_report.go @@ -11,7 +11,7 @@ import ( type FailureReportAPI interface { RegisterFailureReport(failure *asset.NewFailureReport, owner string) (*asset.FailureReport, error) - GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) + GetFailureReport(assetKey string) (*asset.FailureReport, error) } type FailureReportServiceProvider interface { @@ -22,6 +22,7 @@ type FailureReportDependencyProvider interface { LoggerProvider persistence.FailureReportDBALProvider ComputeTaskServiceProvider + FunctionServiceProvider EventServiceProvider TimeServiceProvider } @@ -41,31 +42,26 @@ func (s *FailureReportService) RegisterFailureReport(newFailureReport *asset.New if err != nil { return nil, errors.FromValidationError(asset.FailureReportKind, err) } - - task, err := s.GetComputeTaskService().GetTask(newFailureReport.ComputeTaskKey) - if err != nil { - return nil, err - } - - if task.Worker != requester { - return nil, errors.NewPermissionDenied(fmt.Sprintf("only %q worker can register failure report", task.Worker)) + switch newFailureReport.AssetType { + case asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK: + err = s.processTaskFailure(newFailureReport.AssetKey, requester) + case asset.FailedAssetKind_FAILED_ASSET_FUNCTION: + err = s.processFunctionFailure(newFailureReport.AssetKey, requester) + default: + return nil, errors.NewBadRequest("can only register failure for asset_kind values function and compute task") } - if task.Status != asset.ComputeTaskStatus_STATUS_DOING { - return nil, errors.NewBadRequest(fmt.Sprintf("cannot register failure report for task with status %q", task.Status.String())) - } - - err = s.GetComputeTaskService().ApplyTaskAction(task.Key, asset.ComputeTaskAction_TASK_ACTION_FAILED, "failure report registered", requester) if err != nil { return nil, err } failureReport := &asset.FailureReport{ - ComputeTaskKey: newFailureReport.ComputeTaskKey, - ErrorType: newFailureReport.ErrorType, - LogsAddress: newFailureReport.LogsAddress, - CreationDate: timestamppb.New(s.GetTimeService().GetTransactionTime()), - Owner: requester, + AssetKey: newFailureReport.AssetKey, + AssetType: newFailureReport.AssetType, + ErrorType: newFailureReport.ErrorType, + LogsAddress: newFailureReport.LogsAddress, + CreationDate: timestamppb.New(s.GetTimeService().GetTransactionTime()), + Owner: requester, } err = s.GetFailureReportDBAL().AddFailureReport(failureReport) @@ -75,7 +71,7 @@ func (s *FailureReportService) RegisterFailureReport(newFailureReport *asset.New event := &asset.Event{ EventKind: asset.EventKind_EVENT_ASSET_CREATED, - AssetKey: failureReport.ComputeTaskKey, + AssetKey: failureReport.AssetKey, AssetKind: asset.AssetKind_ASSET_FAILURE_REPORT, Asset: &asset.Event_FailureReport{FailureReport: failureReport}, } @@ -87,7 +83,56 @@ func (s *FailureReportService) RegisterFailureReport(newFailureReport *asset.New return failureReport, nil } -func (s *FailureReportService) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) { - s.GetLogger().Debug().Str("computeTaskKey", computeTaskKey).Msg("Get failure report") - return s.GetFailureReportDBAL().GetFailureReport(computeTaskKey) +func (s *FailureReportService) GetFailureReport(assetKey string) (*asset.FailureReport, error) { + s.GetLogger().Debug().Str("assetKey", assetKey).Msg("Get failure report") + return s.GetFailureReportDBAL().GetFailureReport(assetKey) +} + +func checkTaskPermissions(task *asset.ComputeTask, requester string) error { + if task.Worker != requester { + return errors.NewPermissionDenied(fmt.Sprintf("only %q worker can register failure report for compute task", task.Worker)) + } + + if task.Status != asset.ComputeTaskStatus_STATUS_DOING { + return errors.NewBadRequest(fmt.Sprintf("cannot register failure report for task with status %q", task.Status.String())) + } + + return nil +} + +func (s *FailureReportService) processTaskFailure(taskKey string, requester string) error { + task, err := s.GetComputeTaskService().GetTask(taskKey) + if err != nil { + return err + } + + err = checkTaskPermissions(task, requester) + if err != nil { + return err + } + + return s.GetComputeTaskService().ApplyTaskAction(taskKey, asset.ComputeTaskAction_TASK_ACTION_FAILED, "failure report registered", requester) +} + +func checkFunctionPermissions(function *asset.Function, requester string) error { + if function.Owner != requester { + return errors.NewPermissionDenied(fmt.Sprintf("only %q owner can register failure report for function", function.Owner)) + } + + return nil +} + +func (s *FailureReportService) processFunctionFailure(functionKey string, requester string) error { + function, err := s.GetFunctionService().GetFunction(functionKey) + if err != nil { + return err + } + + err = checkFunctionPermissions(function, requester) + + if err != nil { + return err + } + + return s.GetFunctionService().ApplyFunctionAction(functionKey, asset.FunctionAction_FUNCTION_ACTION_FAILED, "failure report registered", requester) } diff --git a/lib/service/failure_report_test.go b/lib/service/failure_report_test.go index 1b41d0a0..c9aa3ae5 100644 --- a/lib/service/failure_report_test.go +++ b/lib/service/failure_report_test.go @@ -13,7 +13,7 @@ import ( orcerrors "github.com/substra/orchestrator/lib/errors" ) -func TestRegisterFailureReport(t *testing.T) { +func TestRegisterComputeTaskFailureReport(t *testing.T) { taskService := new(MockComputeTaskAPI) failureReportDBAL := new(persistence.MockFailureReportDBAL) eventService := new(MockEventAPI) @@ -29,34 +29,36 @@ func TestRegisterFailureReport(t *testing.T) { timeService.On("GetTransactionTime").Once().Return(transactionTime) newFailureReport := &asset.NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, LogsAddress: &asset.Addressable{ StorageAddress: "https://somewhere", Checksum: "f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2", }, } - taskService.On("GetTask", newFailureReport.ComputeTaskKey).Once().Return(&asset.ComputeTask{ - Key: newFailureReport.ComputeTaskKey, + taskService.On("GetTask", newFailureReport.AssetKey).Once().Return(&asset.ComputeTask{ + Key: newFailureReport.AssetKey, Status: asset.ComputeTaskStatus_STATUS_DOING, Worker: "test", }, nil) - taskService.On("ApplyTaskAction", newFailureReport.ComputeTaskKey, asset.ComputeTaskAction_TASK_ACTION_FAILED, "failure report registered", "test").Once().Return(nil) + taskService.On("ApplyTaskAction", newFailureReport.AssetKey, asset.ComputeTaskAction_TASK_ACTION_FAILED, "failure report registered", "test").Once().Return(nil) storedFailureReport := &asset.FailureReport{ - ComputeTaskKey: newFailureReport.ComputeTaskKey, - ErrorType: newFailureReport.ErrorType, - LogsAddress: newFailureReport.LogsAddress, - CreationDate: timestamppb.New(transactionTime), - Owner: "test", + AssetKey: newFailureReport.AssetKey, + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: newFailureReport.ErrorType, + LogsAddress: newFailureReport.LogsAddress, + CreationDate: timestamppb.New(transactionTime), + Owner: "test", } failureReportDBAL.On("AddFailureReport", storedFailureReport).Once().Return(nil) event := &asset.Event{ EventKind: asset.EventKind_EVENT_ASSET_CREATED, - AssetKey: newFailureReport.ComputeTaskKey, + AssetKey: newFailureReport.AssetKey, AssetKind: asset.AssetKind_ASSET_FAILURE_REPORT, Asset: &asset.Event_FailureReport{FailureReport: storedFailureReport}, } @@ -64,7 +66,7 @@ func TestRegisterFailureReport(t *testing.T) { failureReport, err := service.RegisterFailureReport(newFailureReport, "test") assert.NoError(t, err) - assert.Equal(t, failureReport.ComputeTaskKey, newFailureReport.ComputeTaskKey) + assert.Equal(t, failureReport.AssetKey, newFailureReport.AssetKey) taskService.AssertExpectations(t) failureReportDBAL.AssertExpectations(t) @@ -73,6 +75,68 @@ func TestRegisterFailureReport(t *testing.T) { provider.AssertExpectations(t) } +func TestRegisterFunctionFailureReport(t *testing.T) { + functionService := new(MockFunctionAPI) + failureReportDBAL := new(persistence.MockFailureReportDBAL) + eventService := new(MockEventAPI) + timeService := new(MockTimeAPI) + provider := newMockedProvider() + provider.On("GetFunctionService").Return(functionService) + provider.On("GetFailureReportDBAL").Return(failureReportDBAL) + provider.On("GetEventService").Return(eventService) + provider.On("GetTimeService").Return(timeService) + service := NewFailureReportService(provider) + + transactionTime := time.Unix(1337, 0) + timeService.On("GetTransactionTime").Once().Return(transactionTime) + + newFailureReport := &asset.NewFailureReport{ + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: asset.FailedAssetKind_FAILED_ASSET_FUNCTION, + ErrorType: asset.ErrorType_ERROR_TYPE_BUILD, + LogsAddress: &asset.Addressable{ + StorageAddress: "https://somewhere", + Checksum: "f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2", + }, + } + + functionService.On("GetFunction", newFailureReport.AssetKey).Once().Return(&asset.Function{ + Key: newFailureReport.AssetKey, + Status: asset.FunctionStatus_FUNCTION_STATUS_BUILDING, + Owner: "test", + }, nil) + + functionService.On("ApplyFunctionAction", newFailureReport.AssetKey, asset.FunctionAction_FUNCTION_ACTION_FAILED, "failure report registered", "test").Once().Return(nil) + + storedFailureReport := &asset.FailureReport{ + AssetKey: newFailureReport.AssetKey, + AssetType: asset.FailedAssetKind_FAILED_ASSET_FUNCTION, + ErrorType: newFailureReport.ErrorType, + LogsAddress: newFailureReport.LogsAddress, + CreationDate: timestamppb.New(transactionTime), + Owner: "test", + } + failureReportDBAL.On("AddFailureReport", storedFailureReport).Once().Return(nil) + + event := &asset.Event{ + EventKind: asset.EventKind_EVENT_ASSET_CREATED, + AssetKey: newFailureReport.AssetKey, + AssetKind: asset.AssetKind_ASSET_FAILURE_REPORT, + Asset: &asset.Event_FailureReport{FailureReport: storedFailureReport}, + } + eventService.On("RegisterEvents", event).Once().Return(nil) + + failureReport, err := service.RegisterFailureReport(newFailureReport, "test") + assert.NoError(t, err) + assert.Equal(t, failureReport.AssetKey, newFailureReport.AssetKey) + + functionService.AssertExpectations(t) + failureReportDBAL.AssertExpectations(t) + eventService.AssertExpectations(t) + timeService.AssertExpectations(t) + provider.AssertExpectations(t) +} + func TestRegisterFailureOnFailedTask(t *testing.T) { taskService := new(MockComputeTaskAPI) provider := newMockedProvider() @@ -80,15 +144,16 @@ func TestRegisterFailureOnFailedTask(t *testing.T) { service := NewFailureReportService(provider) newFailureReport := &asset.NewFailureReport{ - ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", - ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, + AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532", + AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK, + ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION, LogsAddress: &asset.Addressable{ StorageAddress: "https://somewhere", Checksum: "f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2", }, } - taskService.On("GetTask", newFailureReport.ComputeTaskKey).Once().Return(&asset.ComputeTask{ + taskService.On("GetTask", newFailureReport.AssetKey).Once().Return(&asset.ComputeTask{ Status: asset.ComputeTaskStatus_STATUS_FAILED, Worker: "test", }, nil) @@ -112,12 +177,12 @@ func TestGetFailure(t *testing.T) { service := NewFailureReportService(provider) failureReport := &asset.FailureReport{ - ComputeTaskKey: "uuid", + AssetKey: "uuid", } - dbal.On("GetFailureReport", failureReport.ComputeTaskKey).Once().Return(failureReport, nil) + dbal.On("GetFailureReport", failureReport.AssetKey).Once().Return(failureReport, nil) - ret, err := service.GetFailureReport(failureReport.ComputeTaskKey) + ret, err := service.GetFailureReport(failureReport.AssetKey) assert.NoError(t, err) assert.Equal(t, failureReport, ret) diff --git a/lib/service/function.go b/lib/service/function.go index d8d77a10..ece3a8fe 100644 --- a/lib/service/function.go +++ b/lib/service/function.go @@ -17,6 +17,8 @@ type FunctionAPI interface { CanDownload(key string, requester string) (bool, error) FunctionExists(key string) (bool, error) UpdateFunction(function *asset.UpdateFunctionParam, requester string) error + ApplyFunctionAction(key string, action asset.FunctionAction, reason string, requester string) error + applyFunctionAction(function *asset.Function, action functionTransition, reason string) error } // FunctionServiceProvider defines an object able to provide an FunctionAPI instance @@ -29,6 +31,7 @@ type FunctionDependencyProvider interface { LoggerProvider persistence.FunctionDBALProvider EventServiceProvider + ComputeTaskServiceProvider PermissionServiceProvider TimeServiceProvider } @@ -70,6 +73,7 @@ func (s *FunctionService) RegisterFunction(a *asset.NewFunction, owner string) ( CreationDate: timestamppb.New(s.GetTimeService().GetTransactionTime()), Inputs: a.Inputs, Outputs: a.Outputs, + Status: asset.FunctionStatus_FUNCTION_STATUS_WAITING, } function.Permissions, err = s.GetPermissionService().CreatePermissions(owner, a.NewPermissions) diff --git a/lib/service/function_test.go b/lib/service/function_test.go index 6d228077..2ffcc7c8 100644 --- a/lib/service/function_test.go +++ b/lib/service/function_test.go @@ -60,6 +60,7 @@ func TestRegisterFunction(t *testing.T) { Permissions: perms, Owner: "owner", CreationDate: timestamppb.New(time.Unix(1337, 0)), + Status: asset.FunctionStatus_FUNCTION_STATUS_WAITING, } dbal.On("AddFunction", storedFunction).Return(nil).Once() diff --git a/lib/service/functionstate.go b/lib/service/functionstate.go new file mode 100644 index 00000000..82aeb22d --- /dev/null +++ b/lib/service/functionstate.go @@ -0,0 +1,180 @@ +package service + +import ( + "context" + "fmt" + + "github.com/looplab/fsm" + "github.com/substra/orchestrator/lib/asset" + orcerrors "github.com/substra/orchestrator/lib/errors" +) + +type functionTransition string + +const ( + transitionFunctionBuilding functionTransition = "transitionBuilding" + transitionFunctionReady functionTransition = "transitionReady" + transitionFunctionCanceled functionTransition = "transitionCanceled" + transitionFunctionFailed functionTransition = "transitionFailed" +) + +// functionStateEvents is the definition of the state machine representing function states +var functionStateEvents = fsm.Events{ + { + Name: string(transitionFunctionCanceled), + Src: []string{asset.FunctionStatus_FUNCTION_STATUS_WAITING.String(), asset.FunctionStatus_FUNCTION_STATUS_BUILDING.String()}, + Dst: asset.FunctionStatus_FUNCTION_STATUS_CANCELED.String(), + }, + { + Name: string(transitionFunctionBuilding), + Src: []string{asset.FunctionStatus_FUNCTION_STATUS_WAITING.String()}, + Dst: asset.FunctionStatus_FUNCTION_STATUS_BUILDING.String(), + }, + { + Name: string(transitionFunctionReady), + Src: []string{asset.FunctionStatus_FUNCTION_STATUS_BUILDING.String()}, + Dst: asset.FunctionStatus_FUNCTION_STATUS_READY.String(), + }, + { + Name: string(transitionFunctionFailed), + Src: []string{asset.FunctionStatus_FUNCTION_STATUS_BUILDING.String()}, + Dst: asset.FunctionStatus_FUNCTION_STATUS_FAILED.String(), + }, +} + +// functionStateUpdater defines a structure capable of handling function updates +type functionStateUpdater interface { + // On state change will receive the ORIGINAL (before transition) function as first argument + // and the transition reason as second argument + // any error should be registered as e.Err + onStateChange(e *fsm.Event) + // Set the compute plan to failed when a function fails. + onFailure(e *fsm.Event) +} + +func newFunctionState(updater functionStateUpdater, function *asset.Function) *fsm.FSM { + return fsm.NewFSM( + function.Status.String(), + functionStateEvents, + fsm.Callbacks{ + "enter_state": wrapFsmCallbackContext(updater.onStateChange), + "after_transitionFailed": wrapFsmCallbackContext(updater.onFailure), + }, + ) +} + +// ApplyFunctionAction apply an asset.FunctionStatus to the function. +func (s *FunctionService) ApplyFunctionAction(key string, action asset.FunctionAction, reason string, requester string) error { + var transition functionTransition + switch action { + case asset.FunctionAction_FUNCTION_ACTION_BUILDING: + transition = transitionFunctionBuilding + case asset.FunctionAction_FUNCTION_ACTION_CANCELED: + transition = transitionFunctionCanceled + case asset.FunctionAction_FUNCTION_ACTION_FAILED: + transition = transitionFunctionFailed + case asset.FunctionAction_FUNCTION_ACTION_READY: + transition = transitionFunctionReady + default: + return orcerrors.NewBadRequest("unsupported action") + } + + if reason == "" { + reason = "User action" + } + + function, err := s.GetFunctionDBAL().GetFunction(key) + if err != nil { + return err + } + if requester != function.Owner { + return orcerrors.NewPermissionDenied("only function owner can update it") + } + + return s.applyFunctionAction(function, transition, reason) +} + +// applyFunctionAction is the internal method allowing any transition (string). +// This allows to use this method with internal only transitions (abort). +func (s *FunctionService) applyFunctionAction(function *asset.Function, action functionTransition, reason string) error { + s.GetLogger().Debug().Str("functionKey", function.Key).Str("action", string(action)).Str("reason", reason).Msg("Applying function action") + state := newFunctionState(s, function) + err := state.Event(context.Background(), string(action), function, reason) + + return err +} + +// onStateChange enqueue an orchestration event and saves the function +func (s *FunctionService) onStateChange(e *fsm.Event) { + if len(e.Args) != 2 { + e.Err = orcerrors.NewInternal(fmt.Sprintf("cannot handle state change with argument: %v", e.Args)) + return + } + function, ok := e.Args[0].(*asset.Function) + if !ok { + e.Err = orcerrors.NewInternal("cannot cast argument into function") + return + } + reason, ok := e.Args[1].(string) + if !ok { + e.Err = orcerrors.NewInternal(fmt.Sprintf("cannot cast into string: %v", e.Args[1])) + return + } + + statusVal, ok := asset.FunctionStatus_value[e.Dst] + if !ok { + // This should not happen since state codes are string representation of statuses + e.Err = orcerrors.NewInternal(fmt.Sprintf("unknown function status %q", e.Dst)) + return + } + function.Status = asset.FunctionStatus(statusVal) + + s.GetLogger().Debug(). + Str("functionKey", function.Key). + Str("newStatus", function.Status.String()). + Str("functionOwner", function.Owner). + Str("reason", reason). + Msg("Updating function status") + + err := s.GetFunctionDBAL().UpdateFunction(function) + if err != nil { + e.Err = err + return + } + + event := &asset.Event{ + EventKind: asset.EventKind_EVENT_ASSET_UPDATED, + AssetKey: function.Key, + AssetKind: asset.AssetKind_ASSET_FUNCTION, + Asset: &asset.Event_Function{Function: function}, + Metadata: map[string]string{ + "reason": reason, + }, + } + err = s.GetEventService().RegisterEvents(event) + if err != nil { + e.Err = err + return + } +} + +func (s *FunctionService) onFailure(e *fsm.Event) { + if len(e.Args) != 2 { + e.Err = orcerrors.NewInternal(fmt.Sprintf("cannot handle state change with argument: %v", e.Args)) + return + } + + function, ok := e.Args[0].(*asset.Function) + if !ok { + e.Err = orcerrors.NewInternal("cannot cast argument into function") + return + } + + err := s.GetComputeTaskService().propagateFunctionCancelation(function.Key, function.Owner) + if err != nil { + s.GetLogger().Error(). + Err(err). + Str("functionKey", function.Key). + Msg("failed to propagate function action") + } +} diff --git a/lib/service/functionstate_test.go b/lib/service/functionstate_test.go new file mode 100644 index 00000000..4beac58d --- /dev/null +++ b/lib/service/functionstate_test.go @@ -0,0 +1,146 @@ +package service + +import ( + "context" + "testing" + + "github.com/looplab/fsm" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/substra/orchestrator/lib/asset" + "github.com/substra/orchestrator/lib/persistence" +) + +func TestOnFunctionStateChange(t *testing.T) { + updater := new(mockFunctionStateUpdater) + updater.On("onStateChange", mock.Anything).Once() + + state := newFunctionState(updater, &asset.Function{Status: asset.FunctionStatus_FUNCTION_STATUS_WAITING, Key: "uuid"}) + + err := state.Event(context.Background(), string(transitionFunctionBuilding), &asset.Function{}) + + assert.NoError(t, err) + updater.AssertExpectations(t) +} + +// Make sure fsm returns expected errors +func TestFailedFunctionStateChange(t *testing.T) { + updater := new(mockFunctionStateUpdater) + + state := newFunctionState(updater, &asset.Function{Status: asset.FunctionStatus_FUNCTION_STATUS_BUILDING, Key: "uuid"}) + + err := state.Event(context.Background(), string(transitionFunctionBuilding), &asset.Function{}) + assert.IsType(t, fsm.InvalidEventError{}, err) + + state = newFunctionState(updater, &asset.Function{Status: asset.FunctionStatus_FUNCTION_STATUS_READY, Key: "uuid"}) + err = state.Event(context.Background(), string(transitionFunctionCanceled), &asset.Function{}) + assert.IsType(t, fsm.InvalidEventError{}, err) + updater.AssertExpectations(t) +} + +func TestDispatchOnFunctionTransition(t *testing.T) { + dbal := new(persistence.MockDBAL) + es := new(MockEventAPI) + provider := newMockedProvider() + + provider.On("GetFunctionDBAL").Return(dbal) + provider.On("GetEventService").Return(es) + + service := NewFunctionService(provider) + + returnedFunction := &asset.Function{ + Key: "uuid", + Status: asset.FunctionStatus_FUNCTION_STATUS_WAITING, + Owner: "owner", + } + dbal.On("GetFunction", "uuid").Return(returnedFunction, nil) + + expectedFunction := &asset.Function{ + Key: "uuid", + Status: asset.FunctionStatus_FUNCTION_STATUS_BUILDING, + Owner: "owner", + } + dbal.On("UpdateFunction", expectedFunction).Once().Return(nil) + + expectedEvent := &asset.Event{ + AssetKey: "uuid", + AssetKind: asset.AssetKind_ASSET_FUNCTION, + EventKind: asset.EventKind_EVENT_ASSET_UPDATED, + Asset: &asset.Event_Function{Function: expectedFunction}, + Metadata: map[string]string{ + "reason": "User action", + }, + } + es.On("RegisterEvents", expectedEvent).Once().Return(nil) + + err := service.ApplyFunctionAction("uuid", asset.FunctionAction_FUNCTION_ACTION_BUILDING, "", "owner") + assert.NoError(t, err) + + dbal.AssertExpectations(t) + es.AssertExpectations(t) + provider.AssertExpectations(t) +} + +// Testing that failing a Function propagate to tasks using this function +func TestUpdateFunctionStateCanceled(t *testing.T) { + dbal := new(persistence.MockDBAL) + es := new(MockEventAPI) + provider := newMockedProvider() + + provider.On("GetFunctionDBAL").Return(dbal) + provider.On("GetEventService").Return(es) + + // function is retrieved from persistence layer + dbal.On("GetFunction", "uuid").Return(&asset.Function{ + Key: "uuid", + Status: asset.FunctionStatus_FUNCTION_STATUS_WAITING, + Owner: "owner", + }, nil) + // An update event should be enqueued + es.On("RegisterEvents", mock.Anything).Return(nil) + // Updated function should be saved + updatedFunction := &asset.Function{Key: "uuid", Status: asset.FunctionStatus_FUNCTION_STATUS_CANCELED, Owner: "owner"} + dbal.On("UpdateFunction", updatedFunction).Return(nil) + + service := NewFunctionService(provider) + + err := service.ApplyFunctionAction("uuid", asset.FunctionAction_FUNCTION_ACTION_CANCELED, "", "owner") + assert.NoError(t, err) + + dbal.AssertExpectations(t) + es.AssertExpectations(t) +} + +func TestUpdateFunctionStateFailed(t *testing.T) { + dbal := new(persistence.MockDBAL) + es := new(MockEventAPI) + ct := new(MockComputeTaskAPI) + provider := newMockedProvider() + + provider.On("GetFunctionDBAL").Return(dbal) + provider.On("GetComputeTaskService").Return(ct) + provider.On("GetEventService").Return(es) + + functionKey := "uuid" + + dbal.On("GetFunction", "uuid").Return(&asset.Function{ + Key: functionKey, + Status: asset.FunctionStatus_FUNCTION_STATUS_BUILDING, + Owner: "owner", + }, nil) + + ct.On("propagateFunctionCancelation", functionKey, "owner").Return(nil) + es.On("RegisterEvents", mock.Anything).Return(nil) + + updatedFunction := &asset.Function{Key: functionKey, Status: asset.FunctionStatus_FUNCTION_STATUS_FAILED, Owner: "owner"} + + dbal.On("UpdateFunction", updatedFunction).Return(nil) + + service := NewFunctionService(provider) + + err := service.ApplyFunctionAction("uuid", asset.FunctionAction_FUNCTION_ACTION_FAILED, "", "owner") + assert.NoError(t, err) + + dbal.AssertExpectations(t) + es.AssertExpectations(t) +} diff --git a/server/distributed/adapters/failure_report.go b/server/distributed/adapters/failure_report.go index e507475c..4c14e616 100644 --- a/server/distributed/adapters/failure_report.go +++ b/server/distributed/adapters/failure_report.go @@ -35,7 +35,7 @@ func (a *FailureReportAdapter) RegisterFailureReport(ctx context.Context, newFai // In this very specific case we are in a retry context after a timeout. // We can assume that the previous request succeeded and created the asset. // So we convert the error in a success response. - err = invocator.Call(ctx, "orchestrator.failurereport:GetFailureReport", &asset.GetFailureReportParam{ComputeTaskKey: newFailureReport.ComputeTaskKey}, failureReport) + err = invocator.Call(ctx, "orchestrator.failurereport:GetFailureReport", &asset.GetFailureReportParam{AssetKey: newFailureReport.AssetKey}, failureReport) return failureReport, err } diff --git a/server/distributed/adapters/function.go b/server/distributed/adapters/function.go index 8d79d448..398c38d1 100644 --- a/server/distributed/adapters/function.go +++ b/server/distributed/adapters/function.go @@ -87,3 +87,18 @@ func (a *FunctionAdapter) UpdateFunction(ctx context.Context, query *asset.Updat return response, err } + +func (a *FunctionAdapter) ApplyFunctionAction(ctx context.Context, param *asset.ApplyFunctionActionParam) (*asset.ApplyFunctionActionResponse, error) { + invocator, err := interceptors.ExtractInvocator(ctx) + if err != nil { + return nil, err + } + method := "orchestrator.function:ApplyFunctionAction" + + err = invocator.Call(ctx, method, param, nil) + if err != nil { + return nil, err + } + + return &asset.ApplyFunctionActionResponse{}, nil +} diff --git a/server/standalone/dbal/computetask.go b/server/standalone/dbal/computetask.go index 27fdf514..a430dc26 100644 --- a/server/standalone/dbal/computetask.go +++ b/server/standalone/dbal/computetask.go @@ -347,6 +347,47 @@ func (d *DBAL) GetComputePlanTasksKeys(key string) ([]string, error) { return keys, nil } +// GetFunctionRunnableTasks returns the list of tasks linked with a function +// that are running or going to +func (d *DBAL) GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error) { + stmt := getStatementBuilder(). + Select("key", "compute_plan_key", "status", "worker", "owner", "rank", "creation_date", + "logs_permission", "metadata", "function_key"). + From("compute_tasks"). + Where(sq.Eq{ + "function_key": key, + "status": []asset.ComputeTaskStatus{ + asset.ComputeTaskStatus_STATUS_DOING, + asset.ComputeTaskStatus_STATUS_TODO, + }}) + + rows, err := d.query(stmt) + if err != nil { + return nil, err + } + defer rows.Close() + + tasks := []*asset.ComputeTask{} + for rows.Next() { + ct := new(sqlComputeTask) + + err = rows.Scan( + &ct.Key, &ct.ComputePlanKey, &ct.Status, &ct.Worker, &ct.Owner, &ct.Rank, &ct.CreationDate, + &ct.LogsPermission, &ct.Metadata, &ct.FunctionKey) + if err != nil { + return nil, err + } + task, err := ct.toComputeTask() + if err != nil { + return nil, err + } + + tasks = append(tasks, task) + } + + return tasks, nil +} + func (d *DBAL) CountComputeTaskRegisteredOutputs(key string) (persistence.ComputeTaskOutputCounter, error) { counter := make(persistence.ComputeTaskOutputCounter) diff --git a/server/standalone/dbal/failure_report.go b/server/standalone/dbal/failure_report.go index a01cd045..0938f302 100644 --- a/server/standalone/dbal/failure_report.go +++ b/server/standalone/dbal/failure_report.go @@ -13,20 +13,22 @@ import ( ) type sqlFailureReport struct { - ComputeTaskKey string - ErrorType asset.ErrorType - CreationDate time.Time - Owner string - LogsChecksum pgtype.Text - LogsAddress pgtype.Text + AssetKey string + AssetType asset.FailedAssetKind + ErrorType asset.ErrorType + CreationDate time.Time + Owner string + LogsChecksum pgtype.Text + LogsAddress pgtype.Text } func (r sqlFailureReport) toFailureReport() *asset.FailureReport { failureReport := &asset.FailureReport{ - ComputeTaskKey: r.ComputeTaskKey, - ErrorType: r.ErrorType, - CreationDate: timestamppb.New(r.CreationDate), - Owner: r.Owner, + AssetKey: r.AssetKey, + AssetType: r.AssetType, + ErrorType: r.ErrorType, + CreationDate: timestamppb.New(r.CreationDate), + Owner: r.Owner, } if r.LogsAddress.Status == pgtype.Present { @@ -39,11 +41,11 @@ func (r sqlFailureReport) toFailureReport() *asset.FailureReport { return failureReport } -func (d *DBAL) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) { +func (d *DBAL) GetFailureReport(assetKey string) (*asset.FailureReport, error) { stmt := getStatementBuilder(). - Select("compute_task_key", "error_type", "creation_date", "owner", "logs_address", "logs_checksum"). + Select("asset_key", "asset_type", "error_type", "creation_date", "owner", "logs_address", "logs_checksum"). From("expanded_failure_reports"). - Where(sq.Eq{"channel": d.channel, "compute_task_key": computeTaskKey}) + Where(sq.Eq{"channel": d.channel, "asset_key": assetKey}) row, err := d.queryRow(stmt) if err != nil { @@ -51,11 +53,11 @@ func (d *DBAL) GetFailureReport(computeTaskKey string) (*asset.FailureReport, er } r := new(sqlFailureReport) - err = row.Scan(&r.ComputeTaskKey, &r.ErrorType, &r.CreationDate, &r.Owner, &r.LogsAddress, &r.LogsChecksum) + err = row.Scan(&r.AssetKey, &r.AssetType, &r.ErrorType, &r.CreationDate, &r.Owner, &r.LogsAddress, &r.LogsChecksum) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - return nil, orcerrors.NewNotFound("failure report", computeTaskKey) + return nil, orcerrors.NewNotFound("failure report", assetKey) } return nil, err } @@ -81,8 +83,8 @@ func (d *DBAL) AddFailureReport(failureReport *asset.FailureReport) error { stmt := getStatementBuilder(). Insert("failure_reports"). - Columns("compute_task_key", "channel", "error_type", "creation_date", "owner", "logs_address"). - Values(failureReport.ComputeTaskKey, d.channel, failureReport.ErrorType, failureReport.CreationDate.AsTime(), failureReport.Owner, logsAddress) + Columns("asset_key", "asset_type", "channel", "error_type", "creation_date", "owner", "logs_address"). + Values(failureReport.AssetKey, failureReport.AssetType.String(), d.channel, failureReport.ErrorType, failureReport.CreationDate.AsTime(), failureReport.Owner, logsAddress) return d.exec(stmt) } diff --git a/server/standalone/dbal/failure_report_test.go b/server/standalone/dbal/failure_report_test.go index fe83ed57..30aebc34 100644 --- a/server/standalone/dbal/failure_report_test.go +++ b/server/standalone/dbal/failure_report_test.go @@ -18,9 +18,9 @@ func TestGetFailureReportNotFound(t *testing.T) { mock.ExpectBegin() - computeTaskKey := "4c67ad88-309a-48b4-8bc4-c2e2c1a87a83" + assetKey := "4c67ad88-309a-48b4-8bc4-c2e2c1a87a83" mock.ExpectQuery(`SELECT .* FROM expanded_failure_reports`). - WithArgs(testChannel, computeTaskKey). + WithArgs(assetKey, testChannel). WillReturnError(pgx.ErrNoRows) tx, err := mock.Begin(context.Background()) @@ -28,7 +28,7 @@ func TestGetFailureReportNotFound(t *testing.T) { dbal := &DBAL{ctx: context.TODO(), tx: tx, channel: testChannel} - _, err = dbal.GetFailureReport(computeTaskKey) + _, err = dbal.GetFailureReport(assetKey) assert.Error(t, err) orcError := new(orcerrors.OrcError) diff --git a/server/standalone/dbal/function.go b/server/standalone/dbal/function.go index 76b48789..2843bb2b 100644 --- a/server/standalone/dbal/function.go +++ b/server/standalone/dbal/function.go @@ -22,6 +22,7 @@ type sqlFunction struct { Owner string CreationDate time.Time Metadata map[string]string + Status asset.FunctionStatus } func (a *sqlFunction) toFunction() *asset.Function { @@ -34,6 +35,7 @@ func (a *sqlFunction) toFunction() *asset.Function { Owner: a.Owner, CreationDate: timestamppb.New(a.CreationDate), Metadata: a.Metadata, + Status: a.Status, } } @@ -51,8 +53,8 @@ func (d *DBAL) AddFunction(function *asset.Function) error { stmt := getStatementBuilder(). Insert("functions"). - Columns("key", "channel", "name", "description", "functionAddress", "permissions", "owner", "creation_date", "metadata"). - Values(function.Key, d.channel, function.Name, function.Description.StorageAddress, function.Function.StorageAddress, function.Permissions, function.Owner, function.CreationDate.AsTime(), function.Metadata) + Columns("key", "channel", "name", "description", "functionAddress", "permissions", "owner", "creation_date", "metadata", "status"). + Values(function.Key, d.channel, function.Name, function.Description.StorageAddress, function.Function.StorageAddress, function.Permissions, function.Owner, function.CreationDate.AsTime(), function.Metadata, function.Status.String()) err = d.exec(stmt) if err != nil { @@ -75,7 +77,7 @@ func (d *DBAL) AddFunction(function *asset.Function) error { // GetFunction implements persistence.FunctionDBAL func (d *DBAL) GetFunction(key string) (*asset.Function, error) { stmt := getStatementBuilder(). - Select("key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata"). + Select("key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata", "status"). From("expanded_functions"). Where(sq.Eq{"key": key, "channel": d.channel}) @@ -85,7 +87,7 @@ func (d *DBAL) GetFunction(key string) (*asset.Function, error) { } al := sqlFunction{} - err = row.Scan(&al.Key, &al.Name, &al.Description.StorageAddress, &al.Description.Checksum, &al.Function.StorageAddress, &al.Function.Checksum, &al.Permissions, &al.Owner, &al.CreationDate, &al.Metadata) + err = row.Scan(&al.Key, &al.Name, &al.Description.StorageAddress, &al.Description.Checksum, &al.Function.StorageAddress, &al.Function.Checksum, &al.Permissions, &al.Owner, &al.CreationDate, &al.Metadata, &al.Status) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -144,7 +146,7 @@ func (d *DBAL) queryFunctions(p *common.Pagination, filter *asset.FunctionQueryF } stmt := getStatementBuilder(). - Select("key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata"). + Select("key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata", "status"). From("expanded_functions"). Where(sq.Eq{"channel": d.channel}). OrderByClause("creation_date ASC, key"). @@ -173,7 +175,7 @@ func (d *DBAL) queryFunctions(p *common.Pagination, filter *asset.FunctionQueryF for rows.Next() { al := sqlFunction{} - err = rows.Scan(&al.Key, &al.Name, &al.Description.StorageAddress, &al.Description.Checksum, &al.Function.StorageAddress, &al.Function.Checksum, &al.Permissions, &al.Owner, &al.CreationDate, &al.Metadata) + err = rows.Scan(&al.Key, &al.Name, &al.Description.StorageAddress, &al.Description.Checksum, &al.Function.StorageAddress, &al.Function.Checksum, &al.Permissions, &al.Owner, &al.CreationDate, &al.Metadata, &al.Status) if err != nil { return nil, "", err } @@ -198,11 +200,12 @@ func (d *DBAL) queryFunctions(p *common.Pagination, filter *asset.FunctionQueryF return functions, bookmark, nil } -// UpdateFunction updates the mutable fields of an function in the DB. List of mutable fields: name. +// UpdateFunction updates the mutable fields of an function in the DB. List of mutable fields: name, status. func (d *DBAL) UpdateFunction(function *asset.Function) error { stmt := getStatementBuilder(). Update("functions"). Set("name", function.Name). + Set("status", function.Status.String()). Where(sq.Eq{"channel": d.channel, "key": function.Key}) return d.exec(stmt) diff --git a/server/standalone/dbal/function_test.go b/server/standalone/dbal/function_test.go index 6b90cd7d..57f064da 100644 --- a/server/standalone/dbal/function_test.go +++ b/server/standalone/dbal/function_test.go @@ -18,10 +18,10 @@ import ( func makeFunctionRows(keys ...string) *pgxmock.Rows { permissions := []byte(`{"process": {"public": true}, "download": {"public": true}}`) - res := pgxmock.NewRows([]string{"key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata"}) + res := pgxmock.NewRows([]string{"key", "name", "description_address", "description_checksum", "function_address", "function_checksum", "permissions", "owner", "creation_date", "metadata", "status"}) for _, key := range keys { - res.AddRow(key, "name", "address", "checksum", "address", "checksum", permissions, "owner", time.Unix(1337, 0), map[string]string{}) + res.AddRow(key, "name", "address", "checksum", "address", "checksum", permissions, "owner", time.Unix(1337, 0), map[string]string{}, asset.FunctionStatus_FUNCTION_STATUS_WAITING.String()) } return res @@ -59,7 +59,7 @@ func TestQueryFunctions(t *testing.T) { mock.ExpectBegin() - mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions`). + mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions`). WithArgs(testChannel, computePlanKey).WillReturnRows(makeFunctionRows("key1", "key2")) mock.ExpectQuery(regexp.QuoteMeta(`SELECT function_key, identifier, kind, multiple, optional FROM function_inputs WHERE function_key IN ($1,$2)`)). @@ -95,7 +95,7 @@ func TestPaginatedQueryFunctions(t *testing.T) { mock.ExpectBegin() - mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions`). + mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions`). WithArgs(testChannel, computePlanKey).WillReturnRows(makeFunctionRows("key1", "key2")) mock.ExpectQuery(regexp.QuoteMeta(`SELECT function_key, identifier, kind, multiple, optional FROM function_inputs WHERE function_key IN ($1)`)). @@ -128,7 +128,7 @@ func TestGetFunction(t *testing.T) { mock.ExpectBegin() uid := "key1" - mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions`).WillReturnRows(makeFunctionRows("key1")) + mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions`).WillReturnRows(makeFunctionRows("key1")) mock.ExpectQuery(regexp.QuoteMeta(`SELECT function_key, identifier, kind, multiple, optional FROM function_inputs WHERE function_key IN ($1)`)). WithArgs("key1").WillReturnRows(makeFunctionInputRows("key1")) @@ -156,7 +156,7 @@ func TestGetFunctionFail(t *testing.T) { mock.ExpectBegin() uid := "4c67ad88-309a-48b4-8bc4-c2e2c1a87a83" - mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions`).WillReturnError(pgx.ErrNoRows) + mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions`).WillReturnError(pgx.ErrNoRows) tx, err := mock.Begin(context.Background()) require.NoError(t, err) @@ -176,7 +176,7 @@ func TestQueryFunctionsByComputePlan(t *testing.T) { mock.ExpectBegin() - mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions .* key IN \(SELECT DISTINCT`). + mock.ExpectQuery(`SELECT key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions .* key IN \(SELECT DISTINCT`). WithArgs(testChannel, "CPKey").WillReturnRows(makeFunctionRows("key1", "key2")) mock.ExpectQuery(regexp.QuoteMeta(`SELECT function_key, identifier, kind, multiple, optional FROM function_inputs WHERE function_key IN ($1,$2)`)). @@ -203,7 +203,7 @@ func TestQueryFunctionsNilFilter(t *testing.T) { mock.ExpectBegin() - mock.ExpectQuery(`key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata FROM expanded_functions`). + mock.ExpectQuery(`key, name, description_address, description_checksum, function_address, function_checksum, permissions, owner, creation_date, metadata, status FROM expanded_functions`). WithArgs(testChannel). WillReturnRows(makeFunctionRows("key1", "key2")) diff --git a/server/standalone/handlers/failure_report.go b/server/standalone/handlers/failure_report.go index 35b11341..c932c4e1 100644 --- a/server/standalone/handlers/failure_report.go +++ b/server/standalone/handlers/failure_report.go @@ -36,5 +36,5 @@ func (s *FailureReportServer) GetFailureReport(ctx context.Context, in *asset.Ge if err != nil { return nil, err } - return services.GetFailureReportService().GetFailureReport(in.ComputeTaskKey) + return services.GetFailureReportService().GetFailureReport(in.AssetKey) } diff --git a/server/standalone/handlers/function.go b/server/standalone/handlers/function.go index ce85d139..008b296d 100644 --- a/server/standalone/handlers/function.go +++ b/server/standalone/handlers/function.go @@ -83,3 +83,21 @@ func (s *FunctionServer) UpdateFunction(ctx context.Context, params *asset.Updat return &asset.UpdateFunctionResponse{}, nil } + +func (s *FunctionServer) ApplyFunctionAction(ctx context.Context, param *asset.ApplyFunctionActionParam) (*asset.ApplyFunctionActionResponse, error) { + requester, err := commonInterceptors.ExtractMSPID(ctx) + if err != nil { + return nil, err + } + provider, err := interceptors.ExtractProvider(ctx) + if err != nil { + return nil, err + } + + err = provider.GetFunctionService().ApplyFunctionAction(param.FunctionKey, param.Action, "", requester) + if err != nil { + return nil, err + } + + return &asset.ApplyFunctionActionResponse{}, nil +} diff --git a/server/standalone/migration/000058_add-function-status.up.sql b/server/standalone/migration/000058_add-function-status.up.sql new file mode 100644 index 00000000..cb16a5c2 --- /dev/null +++ b/server/standalone/migration/000058_add-function-status.up.sql @@ -0,0 +1,40 @@ +SELECT execute($$ + CREATE TYPE function_status AS ENUM ( + 'FUNCTION_STATUS_UNKNOWN', + 'FUNCTION_STATUS_WAITING', + 'FUNCTION_STATUS_BUILDING', + 'FUNCTION_STATUS_READY', + 'FUNCTION_STATUS_CANCELED', + 'FUNCTION_STATUS_FAILED' + ); + + ALTER TABLE functions + ADD COLUMN status function_status DEFAULT 'FUNCTION_STATUS_UNKNOWN'; + ALTER TABLE functions + ALTER COLUMN status DROP DEFAULT; + + DROP VIEW IF EXISTS expanded_functions; + CREATE VIEW expanded_functions AS + SELECT key, + name, + description AS description_address, + desc_add.checksum AS description_checksum, + functionAddress AS function_address, + function_add.checksum AS function_checksum, + permissions, + owner, + creation_date, + metadata, + channel, + status + FROM functions + JOIN addressables desc_add ON functions.description = desc_add.storage_address + JOIN addressables function_add ON functions.functionAddress = function_add.storage_address; + + UPDATE events e + SET asset = jsonb_set(asset, '{status}', to_jsonb('FUNCTION_STATUS_UNKNOWN'::function_status)) + WHERE asset_kind = 'ASSET_FUNCTION'; + + CREATE INDEX ix_compute_tasks_function_key_status ON compute_tasks (function_key, status); + +$$) WHERE NOT column_exists('public', 'functions', 'status'); diff --git a/server/standalone/migration/000059_modify-failure-report.up.sql b/server/standalone/migration/000059_modify-failure-report.up.sql new file mode 100644 index 00000000..d175cc8d --- /dev/null +++ b/server/standalone/migration/000059_modify-failure-report.up.sql @@ -0,0 +1,37 @@ +SELECT execute($$ + CREATE TYPE failed_asset_kind AS ENUM ( + 'FAILED_ASSET_UNKNOWN', + 'FAILED_ASSET_COMPUTE_TASK', + 'FAILED_ASSET_FUNCTION' + ); + + ALTER TABLE failure_reports + RENAME COLUMN compute_task_key TO asset_key; + ALTER TABLE failure_reports + ADD COLUMN asset_type failed_asset_kind DEFAULT 'FAILED_ASSET_COMPUTE_TASK'; + ALTER TABLE failure_reports + ALTER COLUMN asset_type SET DEFAULT 'FAILED_ASSET_UNKNOWN'; + ALTER TABLE failure_reports + DROP CONSTRAINT failure_reports_compute_task_key_fkey; + + DROP VIEW IF EXISTS expanded_failure_reports; + CREATE VIEW expanded_failure_reports AS + SELECT asset_key, + asset_type, + error_type, + logs_address, + a.checksum AS logs_checksum, + creation_date, + owner, + channel + FROM failure_reports + LEFT JOIN addressables a ON failure_reports.logs_address = a.storage_address; + + UPDATE events + SET asset = jsonb_set(asset, '{assetKey}', asset->'computeTaskKey') - 'computeTaskKey' + WHERE asset_kind = 'ASSET_FAILURE_REPORT' AND NOT(asset ? 'assetKey'); + UPDATE events e + SET asset = jsonb_set(asset, '{assetType}', to_jsonb('FAILED_ASSET_COMPUTE_TASK'::failed_asset_kind)) + WHERE asset_kind = 'ASSET_FAILURE_REPORT'; + +$$) WHERE NOT column_exists('public', 'failure_reports', 'asset_type'); From 0245a558d7bd64732e0c7e1fe0ac437d197a436f Mon Sep 17 00:00:00 2001 From: thbcmlowk Date: Wed, 11 Oct 2023 15:46:19 +0000 Subject: [PATCH 2/4] [auto] generate database diagram Signed-off-by: thbcmlowk --- docs/schemas/standalone-database.svg | 1531 +++++++++++++------------- 1 file changed, 765 insertions(+), 766 deletions(-) diff --git a/docs/schemas/standalone-database.svg b/docs/schemas/standalone-database.svg index d512bca7..e34827ac 100644 --- a/docs/schemas/standalone-database.svg +++ b/docs/schemas/standalone-database.svg @@ -4,994 +4,993 @@ - + postgres - + public.schema_migrations - - -public.schema_migrations -     -[BASE TABLE] - -version -[bigint] - -dirty -[boolean] + + +public.schema_migrations +     +[BASE TABLE] + +version +[bigint] + +dirty +[boolean] public.organizations - - -public.organizations -     -[BASE TABLE] - -id -[varchar(100)] - -channel -[varchar(100)] - -creation_date -[timestamp with time zone] - -address -[varchar(200)] + + +public.organizations +     +[BASE TABLE] + +id +[varchar(100)] + +channel +[varchar(100)] + +creation_date +[timestamp with time zone] + +address +[varchar(200)] public.datasamples - - -public.datasamples -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -owner -[varchar(100)] - -checksum -[varchar(64)] - -creation_date -[timestamp with time zone] + + +public.datasamples +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +owner +[varchar(100)] + +checksum +[varchar(64)] + +creation_date +[timestamp with time zone] public.datasamples:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.functions - - -public.functions -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -name -[varchar(100)] - -description -[varchar(200)] - -functionaddress -[varchar(200)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] - -status -[function_status] + + +public.functions +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +name +[varchar(100)] + +description +[varchar(200)] + +functionaddress +[varchar(200)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +status +[function_status] public.functions:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.addressables - - -public.addressables -     -[BASE TABLE] - -storage_address -[varchar(200)] - -checksum -[varchar(64)] + + +public.addressables +     +[BASE TABLE] + +storage_address +[varchar(200)] + +checksum +[varchar(64)] public.functions:functionaddress->public.addressables:storage_address - - -FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) + + +FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) public.functions:description->public.addressables:storage_address - - -FOREIGN KEY (description) REFERENCES addressables(storage_address) + + +FOREIGN KEY (description) REFERENCES addressables(storage_address) public.datamanagers - - -public.datamanagers -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -name -[varchar(100)] - -owner -[varchar(100)] - -permissions -[jsonb] - -description -[varchar(200)] - -opener -[varchar(200)] - -type -[varchar(30)] - -creation_date -[timestamp with time zone] - -logs_permission -[jsonb] - -metadata -[jsonb] + + +public.datamanagers +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +name +[varchar(100)] + +owner +[varchar(100)] + +permissions +[jsonb] + +description +[varchar(200)] + +opener +[varchar(200)] + +type +[varchar(30)] + +creation_date +[timestamp with time zone] + +logs_permission +[jsonb] + +metadata +[jsonb] public.datamanagers:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.datamanagers:description->public.addressables:storage_address - - -FOREIGN KEY (description) REFERENCES addressables(storage_address) + + +FOREIGN KEY (description) REFERENCES addressables(storage_address) public.datamanagers:opener->public.addressables:storage_address - - -FOREIGN KEY (opener) REFERENCES addressables(storage_address) + + +FOREIGN KEY (opener) REFERENCES addressables(storage_address) public.compute_tasks - - -public.compute_tasks -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -compute_plan_key -[uuid] - -status -[varchar(100)] - -worker -[varchar(100)] - -function_key -[uuid] - -owner -[varchar(100)] - -rank -[integer] - -creation_date -[timestamp with time zone] - -logs_permission -[jsonb] - -metadata -[jsonb] + + +public.compute_tasks +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +compute_plan_key +[uuid] + +status +[varchar(100)] + +worker +[varchar(100)] + +function_key +[uuid] + +owner +[varchar(100)] + +rank +[integer] + +creation_date +[timestamp with time zone] + +logs_permission +[jsonb] + +metadata +[jsonb] public.compute_tasks:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.compute_tasks:worker->public.organizations:id - - -FOREIGN KEY (worker, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (worker, channel) REFERENCES organizations(id, channel) public.compute_tasks:function_key->public.functions:key - - -FOREIGN KEY (function_key) REFERENCES functions(key) + + +FOREIGN KEY (function_key) REFERENCES functions(key) public.compute_plans - - -public.compute_plans -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -tag -[varchar(100)] - -metadata -[jsonb] - -name -[varchar(100)] - -cancelation_date -[timestamp with time zone] - -failure_date -[timestamp with time zone] + + +public.compute_plans +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +tag +[varchar(100)] + +metadata +[jsonb] + +name +[varchar(100)] + +cancelation_date +[timestamp with time zone] + +failure_date +[timestamp with time zone] public.compute_tasks:compute_plan_key->public.compute_plans:key - - -FOREIGN KEY (compute_plan_key) REFERENCES compute_plans(key) + + +FOREIGN KEY (compute_plan_key) REFERENCES compute_plans(key) public.compute_task_statuses - - -public.compute_task_statuses -     -[BASE TABLE] - -status -[varchar(100)] + + +public.compute_task_statuses +     +[BASE TABLE] + +status +[varchar(100)] public.compute_tasks:status->public.compute_task_statuses:status - - -FOREIGN KEY (status) REFERENCES compute_task_statuses(status) + + +FOREIGN KEY (status) REFERENCES compute_task_statuses(status) public.models - - -public.models -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -compute_task_key -[uuid] - -address -[varchar(200)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] + + +public.models +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +compute_task_key +[uuid] + +address +[varchar(200)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] public.models:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.models:compute_task_key->public.compute_tasks:key - - -FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) public.models:address->public.addressables:storage_address - - -FOREIGN KEY (address) REFERENCES addressables(storage_address) + + +FOREIGN KEY (address) REFERENCES addressables(storage_address) public.compute_plans:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.performances - - -public.performances -     -[BASE TABLE] - -compute_task_key -[uuid] - -channel -[varchar(100)] - -performance_value -[numeric] - -creation_date -[timestamp with time zone] - -compute_task_output_identifier -[varchar(100)] + + +public.performances +     +[BASE TABLE] + +compute_task_key +[uuid] + +channel +[varchar(100)] + +performance_value +[numeric] + +creation_date +[timestamp with time zone] + +compute_task_output_identifier +[varchar(100)] - + public.compute_task_outputs - - -public.compute_task_outputs -     -[BASE TABLE] - -compute_task_key -[uuid] - -identifier -[varchar(100)] - -permissions -[jsonb] - -transient -[boolean] + + +public.compute_task_outputs +     +[BASE TABLE] + +compute_task_key +[uuid] + +identifier +[varchar(100)] + +permissions +[jsonb] + +transient +[boolean] public.performances:compute_task_key->public.compute_task_outputs:compute_task_key - - -FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) + + +FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) public.events - - -public.events -     -[BASE TABLE] - -id -[uuid] - -asset_key -[varchar(100)] - -channel -[varchar(100)] - -asset_kind -[varchar(50)] - -event_kind -[varchar(50)] - -timestamp -[timestamp with time zone] - -metadata -[jsonb] - -asset -[jsonb] - -position -[bigint] + + +public.events +     +[BASE TABLE] + +id +[uuid] + +asset_key +[varchar(100)] + +channel +[varchar(100)] + +asset_kind +[varchar(50)] + +event_kind +[varchar(50)] + +timestamp +[timestamp with time zone] + +metadata +[jsonb] + +asset +[jsonb] + +position +[bigint] - + public.asset_kinds - - -public.asset_kinds -     -[BASE TABLE] - -kind -[varchar(50)] + + +public.asset_kinds +     +[BASE TABLE] + +kind +[varchar(50)] public.events:asset_kind->public.asset_kinds:kind - - -FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) - + public.event_kinds - - -public.event_kinds -     -[BASE TABLE] - -kind -[varchar(50)] + + +public.event_kinds +     +[BASE TABLE] + +kind +[varchar(50)] public.events:event_kind->public.event_kinds:kind - - -FOREIGN KEY (event_kind) REFERENCES event_kinds(kind) + + +FOREIGN KEY (event_kind) REFERENCES event_kinds(kind) public.failure_reports - - -public.failure_reports -     -[BASE TABLE] - -compute_task_key -[uuid] - -channel -[varchar(100)] - -error_type -[varchar(50)] - -logs_address -[varchar(200)] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] + + +public.failure_reports +     +[BASE TABLE] + +asset_key +[uuid] + +channel +[varchar(100)] + +error_type +[varchar(50)] + +logs_address +[varchar(200)] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +asset_type +[failed_asset_kind] public.failure_reports:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - - - -public.failure_reports:compute_task_key->public.compute_tasks:key - - -FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - + public.failure_reports:logs_address->public.addressables:storage_address - - -FOREIGN KEY (logs_address) REFERENCES addressables(storage_address) + + +FOREIGN KEY (logs_address) REFERENCES addressables(storage_address) public.error_types - - -public.error_types -     -[BASE TABLE] - -error_type -[varchar(50)] + + +public.error_types +     +[BASE TABLE] + +error_type +[varchar(50)] - + public.failure_reports:error_type->public.error_types:error_type - - -FOREIGN KEY (error_type) REFERENCES error_types(error_type) + + +FOREIGN KEY (error_type) REFERENCES error_types(error_type) public.compute_task_parents - - -public.compute_task_parents -     -[BASE TABLE] - -child_task_key -[uuid] - -parent_task_key -[uuid] - -position -[integer] + + +public.compute_task_parents +     +[BASE TABLE] + +child_task_key +[uuid] + +parent_task_key +[uuid] + +position +[integer] - + public.compute_task_parents:child_task_key->public.compute_tasks:key - - -FOREIGN KEY (child_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (child_task_key) REFERENCES compute_tasks(key) - + public.compute_task_parents:parent_task_key->public.compute_tasks:key - - -FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) - - - -public.expanded_failure_reports - - -public.expanded_failure_reports -     -[VIEW] - -compute_task_key -[uuid] - -error_type -[varchar(50)] - -logs_address -[varchar(200)] - -logs_checksum -[varchar(64)] - -creation_date -[timestamp with time zone] - -owner -[varchar(100)] - -channel -[varchar(100)] + + +FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) - + public.expanded_datamanagers - - -public.expanded_datamanagers -     -[VIEW] - -key -[uuid] - -name -[varchar(100)] - -owner -[varchar(100)] - -channel -[varchar(100)] - -permissions -[jsonb] - -description_address -[varchar(200)] - -description_checksum -[varchar(64)] - -opener_address -[varchar(200)] - -opener_checksum -[varchar(64)] - -type -[varchar(30)] - -creation_date -[timestamp with time zone] - -logs_permission -[jsonb] - -metadata -[jsonb] + + +public.expanded_datamanagers +     +[VIEW] + +key +[uuid] + +name +[varchar(100)] + +owner +[varchar(100)] + +channel +[varchar(100)] + +permissions +[jsonb] + +description_address +[varchar(200)] + +description_checksum +[varchar(64)] + +opener_address +[varchar(200)] + +opener_checksum +[varchar(64)] + +type +[varchar(30)] + +creation_date +[timestamp with time zone] + +logs_permission +[jsonb] + +metadata +[jsonb] - + public.datasample_datamanagers - - -public.datasample_datamanagers -     -[BASE TABLE] - -datasample_key -[uuid] - -datamanager_key -[uuid] + + +public.datasample_datamanagers +     +[BASE TABLE] + +datasample_key +[uuid] + +datamanager_key +[uuid] - + public.datasample_datamanagers:datasample_key->public.datasamples:key - - -FOREIGN KEY (datasample_key) REFERENCES datasamples(key) + + +FOREIGN KEY (datasample_key) REFERENCES datasamples(key) - + public.datasample_datamanagers:datamanager_key->public.datamanagers:key - - -FOREIGN KEY (datamanager_key) REFERENCES datamanagers(key) + + +FOREIGN KEY (datamanager_key) REFERENCES datamanagers(key) - + public.function_inputs - - -public.function_inputs -     -[BASE TABLE] - -function_key -[uuid] - -identifier -[varchar(100)] - -kind -[varchar(50)] - -multiple -[boolean] - -optional -[boolean] + + +public.function_inputs +     +[BASE TABLE] + +function_key +[uuid] + +identifier +[varchar(100)] + +kind +[varchar(50)] + +multiple +[boolean] + +optional +[boolean] - + public.function_inputs:function_key->public.functions:key - - -FOREIGN KEY (function_key) REFERENCES functions(key) + + +FOREIGN KEY (function_key) REFERENCES functions(key) - + public.function_inputs:kind->public.asset_kinds:kind - - -FOREIGN KEY (kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (kind) REFERENCES asset_kinds(kind) - + public.function_outputs - - -public.function_outputs -     -[BASE TABLE] - -function_key -[uuid] - -identifier -[varchar(100)] - -kind -[varchar(50)] - -multiple -[boolean] + + +public.function_outputs +     +[BASE TABLE] + +function_key +[uuid] + +identifier +[varchar(100)] + +kind +[varchar(50)] + +multiple +[boolean] - + public.function_outputs:function_key->public.functions:key - - -FOREIGN KEY (function_key) REFERENCES functions(key) + + +FOREIGN KEY (function_key) REFERENCES functions(key) - + public.function_outputs:kind->public.asset_kinds:kind - - -FOREIGN KEY (kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (kind) REFERENCES asset_kinds(kind) - + public.compute_task_inputs - - -public.compute_task_inputs -     -[BASE TABLE] - -compute_task_key -[uuid] - -identifier -[varchar(100)] - -position -[integer] - -asset_key -[uuid] - -parent_task_key -[uuid] - -parent_task_output_identifier -[varchar(100)] + + +public.compute_task_inputs +     +[BASE TABLE] + +compute_task_key +[uuid] + +identifier +[varchar(100)] + +position +[integer] + +asset_key +[uuid] + +parent_task_key +[uuid] + +parent_task_output_identifier +[varchar(100)] - + public.compute_task_inputs:compute_task_key->public.compute_tasks:key - - -FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) - + public.compute_task_inputs:parent_task_key->public.compute_tasks:key - - -FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) - + public.compute_task_outputs:compute_task_key->public.compute_tasks:key - - -FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) + + +FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) - + public.compute_task_output_assets - - -public.compute_task_output_assets -     -[BASE TABLE] - -compute_task_key -[uuid] - -compute_task_output_identifier -[varchar(100)] - -position -[integer] - -asset_kind -[varchar(50)] - -asset_key -[varchar(200)] + + +public.compute_task_output_assets +     +[BASE TABLE] + +compute_task_key +[uuid] + +compute_task_output_identifier +[varchar(100)] + +position +[integer] + +asset_kind +[varchar(50)] + +asset_key +[varchar(200)] - + public.compute_task_output_assets:asset_kind->public.asset_kinds:kind - - -FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) - + public.compute_task_output_assets:compute_task_key->public.compute_task_outputs:compute_task_key - - -FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) + + +FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) - + public.expanded_models - - -public.expanded_models -     -[VIEW] - -key -[uuid] - -compute_task_key -[uuid] - -address -[varchar(200)] - -checksum -[varchar(64)] - -permissions -[jsonb] - -owner -[varchar(100)] - -channel -[varchar(100)] - -creation_date -[timestamp with time zone] + + +public.expanded_models +     +[VIEW] + +key +[uuid] + +compute_task_key +[uuid] + +address +[varchar(200)] + +checksum +[varchar(64)] + +permissions +[jsonb] + +owner +[varchar(100)] + +channel +[varchar(100)] + +creation_date +[timestamp with time zone] - + public.expanded_datasamples - - -public.expanded_datasamples -     -[VIEW] - -key -[uuid] - -owner -[varchar(100)] - -channel -[varchar(100)] - -checksum -[varchar(64)] - -creation_date -[timestamp with time zone] - -datamanager_keys -[jsonb] + + +public.expanded_datasamples +     +[VIEW] + +key +[uuid] + +owner +[varchar(100)] + +channel +[varchar(100)] + +checksum +[varchar(64)] + +creation_date +[timestamp with time zone] + +datamanager_keys +[jsonb] - + public.expanded_functions - - -public.expanded_functions -     -[VIEW] - -key -[uuid] - -name -[varchar(100)] - -description_address -[varchar(200)] - -description_checksum -[varchar(64)] - -function_address -[varchar(200)] - -function_checksum -[varchar(64)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] - -channel -[varchar(100)] - -status -[function_status] + + +public.expanded_functions +     +[VIEW] + +key +[uuid] + +name +[varchar(100)] + +description_address +[varchar(200)] + +description_checksum +[varchar(64)] + +function_address +[varchar(200)] + +function_checksum +[varchar(64)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +channel +[varchar(100)] + +status +[function_status] + + + +public.expanded_failure_reports + + +public.expanded_failure_reports +     +[VIEW] + +asset_key +[uuid] + +asset_type +[failed_asset_kind] + +error_type +[varchar(50)] + +logs_address +[varchar(200)] + +logs_checksum +[varchar(64)] + +creation_date +[timestamp with time zone] + +owner +[varchar(100)] + +channel +[varchar(100)] From 1c0db1a01c065f1909038b29f1c44f08b4c85bd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Barth=C3=A9s?= Date: Wed, 25 Oct 2023 13:34:13 +0200 Subject: [PATCH 3/4] fix: replace PostgreSQL enums by varchar du to caching errors in e2e MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Guilhem Barthés --- .../000058_add-function-status.up.sql | 24 ++++++++++++------- .../000059_modify-failure-report.up.sql | 13 ++++++++-- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/server/standalone/migration/000058_add-function-status.up.sql b/server/standalone/migration/000058_add-function-status.up.sql index cb16a5c2..8030a845 100644 --- a/server/standalone/migration/000058_add-function-status.up.sql +++ b/server/standalone/migration/000058_add-function-status.up.sql @@ -1,15 +1,21 @@ SELECT execute($$ - CREATE TYPE function_status AS ENUM ( - 'FUNCTION_STATUS_UNKNOWN', - 'FUNCTION_STATUS_WAITING', - 'FUNCTION_STATUS_BUILDING', - 'FUNCTION_STATUS_READY', - 'FUNCTION_STATUS_CANCELED', - 'FUNCTION_STATUS_FAILED' + + CREATE TABLE function_statuses ( + status VARCHAR(100) PRIMARY KEY ); + INSERT INTO function_statuses(status) + VALUES ('FUNCTION_STATUS_UNKNOWN'), + ('FUNCTION_STATUS_WAITING'), + ('FUNCTION_STATUS_BUILDING'), + ('FUNCTION_STATUS_READY'), + ('FUNCTION_STATUS_CANCELED'), + ('FUNCTION_STATUS_FAILED'); + + ALTER TABLE functions - ADD COLUMN status function_status DEFAULT 'FUNCTION_STATUS_UNKNOWN'; + ADD COLUMN status VARCHAR(100) DEFAULT 'FUNCTION_STATUS_UNKNOWN' + CONSTRAINT function_status_fkey REFERENCES function_statuses (status); ALTER TABLE functions ALTER COLUMN status DROP DEFAULT; @@ -32,7 +38,7 @@ SELECT execute($$ JOIN addressables function_add ON functions.functionAddress = function_add.storage_address; UPDATE events e - SET asset = jsonb_set(asset, '{status}', to_jsonb('FUNCTION_STATUS_UNKNOWN'::function_status)) + SET asset = jsonb_set(asset, '{status}', to_jsonb('FUNCTION_STATUS_UNKNOWN'::text)) WHERE asset_kind = 'ASSET_FUNCTION'; CREATE INDEX ix_compute_tasks_function_key_status ON compute_tasks (function_key, status); diff --git a/server/standalone/migration/000059_modify-failure-report.up.sql b/server/standalone/migration/000059_modify-failure-report.up.sql index d175cc8d..344b488b 100644 --- a/server/standalone/migration/000059_modify-failure-report.up.sql +++ b/server/standalone/migration/000059_modify-failure-report.up.sql @@ -4,11 +4,20 @@ SELECT execute($$ 'FAILED_ASSET_COMPUTE_TASK', 'FAILED_ASSET_FUNCTION' ); + CREATE TABLE failed_asset_kinds ( + kind VARCHAR(100) PRIMARY KEY + ); + + INSERT INTO failed_asset_kinds(kind) + VALUES ('FAILED_ASSET_UNKNOWN'), + ('FAILED_ASSET_COMPUTE_TASK'), + ('FAILED_ASSET_FUNCTION'); ALTER TABLE failure_reports RENAME COLUMN compute_task_key TO asset_key; ALTER TABLE failure_reports - ADD COLUMN asset_type failed_asset_kind DEFAULT 'FAILED_ASSET_COMPUTE_TASK'; + ADD COLUMN asset_type VARCHAR(100) DEFAULT 'FAILED_ASSET_COMPUTE_TASK' + CONSTRAINT asset_kind_fkey REFERENCES failed_asset_kinds (kind); ALTER TABLE failure_reports ALTER COLUMN asset_type SET DEFAULT 'FAILED_ASSET_UNKNOWN'; ALTER TABLE failure_reports @@ -31,7 +40,7 @@ SELECT execute($$ SET asset = jsonb_set(asset, '{assetKey}', asset->'computeTaskKey') - 'computeTaskKey' WHERE asset_kind = 'ASSET_FAILURE_REPORT' AND NOT(asset ? 'assetKey'); UPDATE events e - SET asset = jsonb_set(asset, '{assetType}', to_jsonb('FAILED_ASSET_COMPUTE_TASK'::failed_asset_kind)) + SET asset = jsonb_set(asset, '{assetType}', to_jsonb('FAILED_ASSET_COMPUTE_TASK'::text)) WHERE asset_kind = 'ASSET_FAILURE_REPORT'; $$) WHERE NOT column_exists('public', 'failure_reports', 'asset_type'); From 15e17e14ab5d4596fd989b86f667b021e50dc802 Mon Sep 17 00:00:00 2001 From: guilhem-barthes Date: Wed, 25 Oct 2023 11:35:21 +0000 Subject: [PATCH 4/4] [auto] generate database diagram Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- docs/schemas/standalone-database.svg | 960 ++++++++++++++------------- 1 file changed, 499 insertions(+), 461 deletions(-) diff --git a/docs/schemas/standalone-database.svg b/docs/schemas/standalone-database.svg index e34827ac..4e63bf75 100644 --- a/docs/schemas/standalone-database.svg +++ b/docs/schemas/standalone-database.svg @@ -4,11 +4,11 @@ - + postgres - + public.schema_migrations @@ -79,77 +79,96 @@ public.functions - - -public.functions -     -[BASE TABLE] - -key -[uuid] - -channel -[varchar(100)] - -name -[varchar(100)] - -description -[varchar(200)] - -functionaddress -[varchar(200)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] - -status -[function_status] + + +public.functions +     +[BASE TABLE] + +key +[uuid] + +channel +[varchar(100)] + +name +[varchar(100)] + +description +[varchar(200)] + +functionaddress +[varchar(200)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +status +[varchar(100)] public.functions:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) public.addressables - - -public.addressables -     -[BASE TABLE] - -storage_address -[varchar(200)] - -checksum -[varchar(64)] + + +public.addressables +     +[BASE TABLE] + +storage_address +[varchar(200)] + +checksum +[varchar(64)] public.functions:functionaddress->public.addressables:storage_address - - -FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) + + +FOREIGN KEY (functionaddress) REFERENCES addressables(storage_address) public.functions:description->public.addressables:storage_address - - -FOREIGN KEY (description) REFERENCES addressables(storage_address) + + +FOREIGN KEY (description) REFERENCES addressables(storage_address) + + + +public.function_statuses + + +public.function_statuses +     +[BASE TABLE] + +status +[varchar(100)] + + + +public.functions:status->public.function_statuses:status + + +FOREIGN KEY (status) REFERENCES function_statuses(status) @@ -194,25 +213,25 @@ [jsonb] - + public.datamanagers:owner->public.organizations:id FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - + public.datamanagers:description->public.addressables:storage_address - - -FOREIGN KEY (description) REFERENCES addressables(storage_address) + + +FOREIGN KEY (description) REFERENCES addressables(storage_address) - + public.datamanagers:opener->public.addressables:storage_address - - -FOREIGN KEY (opener) REFERENCES addressables(storage_address) + + +FOREIGN KEY (opener) REFERENCES addressables(storage_address) @@ -257,24 +276,24 @@ [jsonb] - + public.compute_tasks:owner->public.organizations:id FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - + public.compute_tasks:worker->public.organizations:id FOREIGN KEY (worker, channel) REFERENCES organizations(id, channel) - + public.compute_tasks:function_key->public.functions:key - - + + FOREIGN KEY (function_key) REFERENCES functions(key) @@ -314,7 +333,7 @@ [timestamp with time zone] - + public.compute_tasks:compute_plan_key->public.compute_plans:key @@ -333,7 +352,7 @@ [varchar(100)] - + public.compute_tasks:status->public.compute_task_statuses:status @@ -370,28 +389,28 @@ [timestamp with time zone] - + public.models:owner->public.organizations:id - - + + FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - + public.models:compute_task_key->public.compute_tasks:key FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) - + public.models:address->public.addressables:storage_address - - + + FOREIGN KEY (address) REFERENCES addressables(storage_address) - + public.compute_plans:owner->public.organizations:id @@ -443,7 +462,7 @@ [boolean] - + public.performances:compute_task_key->public.compute_task_outputs:compute_task_key @@ -452,139 +471,158 @@ public.events - - -public.events -     -[BASE TABLE] - -id -[uuid] - -asset_key -[varchar(100)] - -channel -[varchar(100)] - -asset_kind -[varchar(50)] - -event_kind -[varchar(50)] - -timestamp -[timestamp with time zone] - -metadata -[jsonb] - -asset -[jsonb] - -position -[bigint] + + +public.events +     +[BASE TABLE] + +id +[uuid] + +asset_key +[varchar(100)] + +channel +[varchar(100)] + +asset_kind +[varchar(50)] + +event_kind +[varchar(50)] + +timestamp +[timestamp with time zone] + +metadata +[jsonb] + +asset +[jsonb] + +position +[bigint] public.asset_kinds - - -public.asset_kinds -     -[BASE TABLE] - -kind -[varchar(50)] + + +public.asset_kinds +     +[BASE TABLE] + +kind +[varchar(50)] - + public.events:asset_kind->public.asset_kinds:kind - - -FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) public.event_kinds - - -public.event_kinds -     -[BASE TABLE] - -kind -[varchar(50)] + + +public.event_kinds +     +[BASE TABLE] + +kind +[varchar(50)] - + public.events:event_kind->public.event_kinds:kind - - -FOREIGN KEY (event_kind) REFERENCES event_kinds(kind) + + +FOREIGN KEY (event_kind) REFERENCES event_kinds(kind) public.failure_reports - - -public.failure_reports -     -[BASE TABLE] - -asset_key -[uuid] - -channel -[varchar(100)] - -error_type -[varchar(50)] - -logs_address -[varchar(200)] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -asset_type -[failed_asset_kind] + + +public.failure_reports +     +[BASE TABLE] + +asset_key +[uuid] + +channel +[varchar(100)] + +error_type +[varchar(50)] + +logs_address +[varchar(200)] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +asset_type +[varchar(100)] - + public.failure_reports:owner->public.organizations:id - - -FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) + + +FOREIGN KEY (owner, channel) REFERENCES organizations(id, channel) - + public.failure_reports:logs_address->public.addressables:storage_address - - -FOREIGN KEY (logs_address) REFERENCES addressables(storage_address) + + +FOREIGN KEY (logs_address) REFERENCES addressables(storage_address) public.error_types - - -public.error_types -     -[BASE TABLE] - -error_type -[varchar(50)] + + +public.error_types +     +[BASE TABLE] + +error_type +[varchar(50)] - + public.failure_reports:error_type->public.error_types:error_type - - -FOREIGN KEY (error_type) REFERENCES error_types(error_type) + + +FOREIGN KEY (error_type) REFERENCES error_types(error_type) + + + +public.failed_asset_kinds + + +public.failed_asset_kinds +     +[BASE TABLE] + +kind +[varchar(100)] + + + +public.failure_reports:asset_type->public.failed_asset_kinds:kind + + +FOREIGN KEY (asset_type) REFERENCES failed_asset_kinds(kind) @@ -605,168 +643,168 @@ [integer] - + public.compute_task_parents:child_task_key->public.compute_tasks:key -FOREIGN KEY (child_task_key) REFERENCES compute_tasks(key) +FOREIGN KEY (child_task_key) REFERENCES compute_tasks(key) - + public.compute_task_parents:parent_task_key->public.compute_tasks:key -FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) +FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) public.expanded_datamanagers - - -public.expanded_datamanagers -     -[VIEW] - -key -[uuid] - -name -[varchar(100)] - -owner -[varchar(100)] - -channel -[varchar(100)] - -permissions -[jsonb] - -description_address -[varchar(200)] - -description_checksum -[varchar(64)] - -opener_address -[varchar(200)] - -opener_checksum -[varchar(64)] - -type -[varchar(30)] - -creation_date -[timestamp with time zone] - -logs_permission -[jsonb] - -metadata -[jsonb] + + +public.expanded_datamanagers +     +[VIEW] + +key +[uuid] + +name +[varchar(100)] + +owner +[varchar(100)] + +channel +[varchar(100)] + +permissions +[jsonb] + +description_address +[varchar(200)] + +description_checksum +[varchar(64)] + +opener_address +[varchar(200)] + +opener_checksum +[varchar(64)] + +type +[varchar(30)] + +creation_date +[timestamp with time zone] + +logs_permission +[jsonb] + +metadata +[jsonb] public.datasample_datamanagers - - -public.datasample_datamanagers -     -[BASE TABLE] - -datasample_key -[uuid] - -datamanager_key -[uuid] + + +public.datasample_datamanagers +     +[BASE TABLE] + +datasample_key +[uuid] + +datamanager_key +[uuid] - + public.datasample_datamanagers:datasample_key->public.datasamples:key - - -FOREIGN KEY (datasample_key) REFERENCES datasamples(key) + + +FOREIGN KEY (datasample_key) REFERENCES datasamples(key) - + public.datasample_datamanagers:datamanager_key->public.datamanagers:key - - -FOREIGN KEY (datamanager_key) REFERENCES datamanagers(key) + + +FOREIGN KEY (datamanager_key) REFERENCES datamanagers(key) public.function_inputs - - -public.function_inputs -     -[BASE TABLE] - -function_key -[uuid] - -identifier -[varchar(100)] - -kind -[varchar(50)] - -multiple -[boolean] - -optional -[boolean] + + +public.function_inputs +     +[BASE TABLE] + +function_key +[uuid] + +identifier +[varchar(100)] + +kind +[varchar(50)] + +multiple +[boolean] + +optional +[boolean] - + public.function_inputs:function_key->public.functions:key - - -FOREIGN KEY (function_key) REFERENCES functions(key) + + +FOREIGN KEY (function_key) REFERENCES functions(key) - + public.function_inputs:kind->public.asset_kinds:kind - - -FOREIGN KEY (kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (kind) REFERENCES asset_kinds(kind) public.function_outputs - - -public.function_outputs -     -[BASE TABLE] - -function_key -[uuid] - -identifier -[varchar(100)] - -kind -[varchar(50)] - -multiple -[boolean] + + +public.function_outputs +     +[BASE TABLE] + +function_key +[uuid] + +identifier +[varchar(100)] + +kind +[varchar(50)] + +multiple +[boolean] - + public.function_outputs:function_key->public.functions:key - - -FOREIGN KEY (function_key) REFERENCES functions(key) + + +FOREIGN KEY (function_key) REFERENCES functions(key) - + public.function_outputs:kind->public.asset_kinds:kind - - -FOREIGN KEY (kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (kind) REFERENCES asset_kinds(kind) @@ -796,201 +834,201 @@ [varchar(100)] - + public.compute_task_inputs:compute_task_key->public.compute_tasks:key FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) - + public.compute_task_inputs:parent_task_key->public.compute_tasks:key FOREIGN KEY (parent_task_key) REFERENCES compute_tasks(key) - + public.compute_task_outputs:compute_task_key->public.compute_tasks:key -FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) +FOREIGN KEY (compute_task_key) REFERENCES compute_tasks(key) public.compute_task_output_assets - - -public.compute_task_output_assets -     -[BASE TABLE] - -compute_task_key -[uuid] - -compute_task_output_identifier -[varchar(100)] - -position -[integer] - -asset_kind -[varchar(50)] - -asset_key -[varchar(200)] + + +public.compute_task_output_assets +     +[BASE TABLE] + +compute_task_key +[uuid] + +compute_task_output_identifier +[varchar(100)] + +position +[integer] + +asset_kind +[varchar(50)] + +asset_key +[varchar(200)] - + public.compute_task_output_assets:asset_kind->public.asset_kinds:kind - - -FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) + + +FOREIGN KEY (asset_kind) REFERENCES asset_kinds(kind) - + public.compute_task_output_assets:compute_task_key->public.compute_task_outputs:compute_task_key - - -FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) + + +FOREIGN KEY (compute_task_key, compute_task_output_identifier) REFERENCES compute_task_outputs(compute_task_key, identifier) public.expanded_models - - -public.expanded_models -     -[VIEW] - -key -[uuid] - -compute_task_key -[uuid] - -address -[varchar(200)] - -checksum -[varchar(64)] - -permissions -[jsonb] - -owner -[varchar(100)] - -channel -[varchar(100)] - -creation_date -[timestamp with time zone] + + +public.expanded_models +     +[VIEW] + +key +[uuid] + +compute_task_key +[uuid] + +address +[varchar(200)] + +checksum +[varchar(64)] + +permissions +[jsonb] + +owner +[varchar(100)] + +channel +[varchar(100)] + +creation_date +[timestamp with time zone] public.expanded_datasamples - - -public.expanded_datasamples -     -[VIEW] - -key -[uuid] - -owner -[varchar(100)] - -channel -[varchar(100)] - -checksum -[varchar(64)] - -creation_date -[timestamp with time zone] - -datamanager_keys -[jsonb] + + +public.expanded_datasamples +     +[VIEW] + +key +[uuid] + +owner +[varchar(100)] + +channel +[varchar(100)] + +checksum +[varchar(64)] + +creation_date +[timestamp with time zone] + +datamanager_keys +[jsonb] - + public.expanded_functions - - -public.expanded_functions -     -[VIEW] - -key -[uuid] - -name -[varchar(100)] - -description_address -[varchar(200)] - -description_checksum -[varchar(64)] - -function_address -[varchar(200)] - -function_checksum -[varchar(64)] - -permissions -[jsonb] - -owner -[varchar(100)] - -creation_date -[timestamp with time zone] - -metadata -[jsonb] - -channel -[varchar(100)] - -status -[function_status] + + +public.expanded_functions +     +[VIEW] + +key +[uuid] + +name +[varchar(100)] + +description_address +[varchar(200)] + +description_checksum +[varchar(64)] + +function_address +[varchar(200)] + +function_checksum +[varchar(64)] + +permissions +[jsonb] + +owner +[varchar(100)] + +creation_date +[timestamp with time zone] + +metadata +[jsonb] + +channel +[varchar(100)] + +status +[varchar(100)] - + public.expanded_failure_reports - - -public.expanded_failure_reports -     -[VIEW] - -asset_key -[uuid] - -asset_type -[failed_asset_kind] - -error_type -[varchar(50)] - -logs_address -[varchar(200)] - -logs_checksum -[varchar(64)] - -creation_date -[timestamp with time zone] - -owner -[varchar(100)] - -channel -[varchar(100)] + + +public.expanded_failure_reports +     +[VIEW] + +asset_key +[uuid] + +asset_type +[varchar(100)] + +error_type +[varchar(50)] + +logs_address +[varchar(200)] + +logs_checksum +[varchar(64)] + +creation_date +[timestamp with time zone] + +owner +[varchar(100)] + +channel +[varchar(100)]