Skip to content

Commit

Permalink
Merge pull request #97 from JVickery-TBS/feature/ipipe-interafce
Browse files Browse the repository at this point in the history
IPipeValidation Interface
  • Loading branch information
duttonw authored Dec 9, 2024
2 parents ecf8bdd + b155a94 commit 8003464
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 32 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: '3.7'
python-version: '3.9'
- name: Install requirements
run: pip install flake8 pycodestyle
- name: Check syntax
Expand All @@ -26,7 +26,7 @@ jobs:
image: openknowledge/ckan-dev:${{ matrix.ckan-version }}
services:
solr:
image: ckan/ckan-solr-dev:${{ matrix.ckan-version }}
image: ckan/ckan-solr:${{ matrix.ckan-version }}
postgres:
image: ckan/ckan-postgres-dev:${{ matrix.ckan-version }}
env:
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,21 @@ class IDataValidation(Interface):
return True
```

The plugin also provides the `IPipeValidation` interface so other plugins can receive the dictized validation reports in a Data Pipeline way. This would allow plugins to perform actions once a validation job is finished.

Example:
```
import ckan.plugins as plugins
from ckanext.validation.interfaces import IPipeValidation
class MyPlugin(plugins.SingletonPlugin):
plugins.implements(IPipeValidation)
def receive_validation_report(self, validation_report):
if validation_report.get('status') == 'success':
# Do something when the resource successfully passes validation
```

## Action functions

The `validation` plugin adds new API actions to create and display validation
Expand Down
18 changes: 18 additions & 0 deletions ckanext/validation/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ def can_validate(self, context, data_dict):
'''
return True


class IPipeValidation(Interface):
"""
Process data in a Data Pipeline.
Inherit this to subscribe to events in the Data Pipeline and be able to
broadcast the results for others to process next. In this way, a number of
IPipes can be linked up in sequence to build up a data processing pipeline.
When a resource is validated, it broadcasts its validation_report,
perhaps triggering a process which transforms the data to another format,
or loads it into a datastore. These processes can in turn put the resulting
validation reports into the pipeline
"""

def receive_validation_report(self, validation_report):
pass
7 changes: 6 additions & 1 deletion ckanext/validation/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import ckantoolkit as t

from ckanext.validation.model import Validation
from ckanext.validation.utils import get_update_mode_from_config
from ckanext.validation.utils import (
get_update_mode_from_config,
send_validation_report,
validation_dictize,
)


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,6 +136,7 @@ def run_validation_job(resource):
'_validation_performed': True
}
t.get_action('resource_patch')(patch_context, data_dict)
send_validation_report(validation_dictize(validation))



Expand Down
19 changes: 2 additions & 17 deletions ckanext/validation/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_create_mode_from_config,
get_update_mode_from_config,
delete_local_uploaded_file,
validation_dictize,
)


Expand Down Expand Up @@ -172,7 +173,7 @@ def resource_validation_show(context, data_dict):
raise t.ObjectNotFound(
'No validation report exists for this resource')

return _validation_dictize(validation)
return validation_dictize(validation)


def resource_validation_delete(context, data_dict):
Expand Down Expand Up @@ -404,22 +405,6 @@ def _add_default_formats(search_data_dict):
search_data_dict['fq_list'].append(' OR '.join(filter_formats_query))


def _validation_dictize(validation):
out = {
'id': validation.id,
'resource_id': validation.resource_id,
'status': validation.status,
'report': validation.report,
'error': validation.error,
}
out['created'] = (
validation.created.isoformat() if validation.created else None)
out['finished'] = (
validation.finished.isoformat() if validation.finished else None)

return out


@t.chained_action
def resource_create(up_func, context, data_dict):
'''Appends a new resource to a datasets list of resources.
Expand Down
30 changes: 18 additions & 12 deletions ckanext/validation/tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,9 @@ def test_run_non_auth_user(self):
user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()]
owner_org=org["id"]
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

Expand All @@ -357,7 +358,7 @@ def test_run_non_auth_user(self):
call_auth,
"resource_validation_run",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)

def test_run_auth_user(self):
Expand All @@ -367,16 +368,17 @@ def test_run_auth_user(self):
users=[{"name": user["name"], "capacity": "editor"}]
)
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()]
owner_org=org["id"]
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

assert (
call_auth(
"resource_validation_run",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)
is True
)
Expand Down Expand Up @@ -416,8 +418,9 @@ def test_delete_non_auth_user(self):
user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()]
owner_org=org["id"]
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

Expand All @@ -426,7 +429,7 @@ def test_delete_non_auth_user(self):
call_auth,
"resource_validation_delete",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)

def test_delete_auth_user(self):
Expand All @@ -436,16 +439,17 @@ def test_delete_auth_user(self):
users=[{"name": user["name"], "capacity": "editor"}]
)
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()]
owner_org=org["id"]
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

assert (
call_auth(
"resource_validation_delete",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)
is True
)
Expand All @@ -468,16 +472,17 @@ def test_show_anon_public_dataset(self):
user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()], private=False
owner_org=org["id"], private=False
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

assert (
call_auth(
"resource_validation_show",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)
is True
)
Expand All @@ -487,8 +492,9 @@ def test_show_anon_private_dataset(self):
user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(
owner_org=org["id"], resources=[factories.Resource()], private=True
owner_org=org["id"], private=True
)
resource = factories.Resource(package_id=dataset["id"])

context = {"user": user["name"], "model": model}

Expand All @@ -497,7 +503,7 @@ def test_show_anon_private_dataset(self):
call_auth,
"resource_validation_run",
context=context,
resource_id=dataset["resources"][0]["id"],
resource_id=resource["id"],
)


Expand Down
30 changes: 30 additions & 0 deletions ckanext/validation/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import logging

from ckan.plugins import PluginImplementations
from ckan.lib.uploader import ResourceUpload
from ckantoolkit import config, asbool

from ckanext.validation.interfaces import IPipeValidation


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,3 +74,30 @@ def delete_local_uploaded_file(resource_id):

except OSError as e:
log.warning(u'Error deleting uploaded file: %s', e)


def validation_dictize(validation):
out = {
'id': validation.id,
'resource_id': validation.resource_id,
'status': validation.status,
'report': validation.report,
'error': validation.error,
}
out['created'] = (
validation.created.isoformat() if validation.created else None)
out['finished'] = (
validation.finished.isoformat() if validation.finished else None)

return out


def send_validation_report(validation_report):
for observer in PluginImplementations(IPipeValidation):
try:
observer.receive_validation_report(validation_report)
except Exception as ex:
log.exception(ex)
# We reraise all exceptions so they are obvious there
# is something wrong
raise

0 comments on commit 8003464

Please sign in to comment.