Skip to content

Commit

Permalink
[Feature] Survey validator
Browse files Browse the repository at this point in the history
  • Loading branch information
Sispheor committed Jan 8, 2024
1 parent c095998 commit 5a9e3cd
Show file tree
Hide file tree
Showing 37 changed files with 1,078 additions and 60 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- Hide disabled operations from Instance view

## Feature

- Add SurveyValidator on Operation. It allows you to implement your own validation logic with Python on the entire form. See documentation [here](https://hewlettpackard.github.io/squest/latest/manual/advanced/survey_validators/)

# 2.4.1 2023-12-18

## Fix
Expand Down
1 change: 1 addition & 0 deletions Squest/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def backup_filename(databasename, servername, datetime, extension, content_type)
# Plugins
# -----------------------------------------
FIELD_VALIDATOR_PATH = "plugins/field_validators"
SURVEY_VALIDATOR_PATH = "plugins/survey_validators"

# -----------------------------------------
# SQL Debug
Expand Down
67 changes: 61 additions & 6 deletions Squest/utils/plugin_controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging
from importlib.machinery import SourceFileLoader
import inspect
import os
from pydoc import locate
import re

from django.conf import settings

Expand All @@ -9,10 +13,16 @@
logger = logging.getLogger(__name__)


def full_path_to_dotted_path(path):
# /foo/bar/myfile.py -> foo.bar.myfile
path = re.sub(r"\.py", "", path)
path = re.sub(r"/", ".", path)
return path

class PluginController:

@classmethod
def get_user_provisioned_validators(cls):
def get_user_provisioned_field_validators(cls):
filepath = settings.FIELD_VALIDATOR_PATH
file_list = os.listdir(filepath)
for forbidden_word in ["__init__.py", "__pycache__"]:
Expand All @@ -24,24 +34,69 @@ def get_user_provisioned_validators(cls):
returned_list.sort()
return returned_list

@classmethod
def get_user_provisioned_survey_validators(cls):
def is_validator(obj):
"""
Returns True if the object is a Script.
"""
from service_catalog.forms import SurveyValidator
try:
return issubclass(obj, SurveyValidator) and obj != SurveyValidator
except TypeError:
return False

def python_name(full_path):
# /foo/bar/myfile.py -> myfile
path, filename = os.path.split(full_path)
name = os.path.splitext(filename)[0]
if name == "__init__":
# File is a package
return os.path.basename(path)
else:
return name

scripts = list()
for filename in os.listdir(settings.SURVEY_VALIDATOR_PATH):
if filename in ["__init__.py", "__pycache__"]:
continue
full_path = os.path.join(settings.SURVEY_VALIDATOR_PATH, filename)
loader = SourceFileLoader(python_name(filename), full_path)
module = loader.load_module()
for name, klass in inspect.getmembers(module, is_validator):
dotted_path = f"{python_name(filename)}.{klass.__name__}"
scripts.append(dotted_path)
return scripts

@classmethod
def get_ui_field_validator_def(cls, validator_file):
return cls._load_validator_module(module_name=validator_file, definition_kind=VALIDATE_UI_DEFINITION_NAME)
return cls._load_field_validator_module(module_name=validator_file, definition_kind=VALIDATE_UI_DEFINITION_NAME)

@classmethod
def get_api_field_validator_def(cls, validator_file):
return cls._load_validator_module(module_name=validator_file, definition_kind=VALIDATE_API_DEFINITION_NAME)
return cls._load_field_validator_module(module_name=validator_file,
definition_kind=VALIDATE_API_DEFINITION_NAME)

@classmethod
def _load_validator_module(cls, module_name, definition_kind):
def get_survey_validator_def(cls, validator_path):
return locate(f"{full_path_to_dotted_path(settings.SURVEY_VALIDATOR_PATH)}.{validator_path}")

@classmethod
def _load_field_validator_module(cls, module_name, definition_kind):
logger.warning("Deprecation warning: Please switch to SurveyValidator")
filepath = settings.FIELD_VALIDATOR_PATH
return cls._load_validator_module(module_name, definition_kind, filepath)

@staticmethod
def _load_validator_module(module_name, definition_kind, filepath):
"""
Dynamically load a validator definition from a python file
:param module_name: name of the python file that contains field validator definitions
:param module_name: name of the python file that contains validator definitions
:param definition_kind: UI or API
:return:
"""
try:
mod = __import__(f"{settings.FIELD_VALIDATOR_PATH.replace('/','.')}.{module_name}",
mod = __import__(f"{filepath.replace('/', '.')}.{module_name}",
fromlist=[definition_kind])
klass = getattr(mod, definition_kind)
return klass
Expand Down
5 changes: 3 additions & 2 deletions Squest/utils/squest_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def __init__(self, *args, **kwargs):
def is_valid(self):
returned_value = super(SquestForm, self).is_valid()
for field_name in self.errors.keys():
current_class = self.fields.get(field_name).widget.attrs.get('class', '')
self.fields.get(field_name).widget.attrs['class'] = f"{current_class} {self.error_css_class}"
if field_name != "__all__":
current_class = self.fields.get(field_name).widget.attrs.get('class', '')
self.fields.get(field_name).widget.attrs['class'] = f"{current_class} {self.error_css_class}"
return returned_value

def beautify(self):
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:
- django_static:/app/static
- django_media:/app/media
- backup:/app/backup
- ./plugins/field_validators:/app/plugins/field_validators
- ./plugins:/app/plugins
depends_on:
- db
- rabbitmq
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/squest_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,20 @@ Change the format of all date in Squest UI. Based on Python [strftime](https://s

### FIELD_VALIDATOR_PATH

!!!warning

FIELD_VALIDATOR_PATH is now deprecated. Please use [SURVEY_VALIDATOR_PATH](#survey_validator_path) instead.

**Default:** `plugins/field_validators`

Path to form field validation modules.

### SURVEY_VALIDATOR_PATH

**Default:** `plugins/survey_validators`

Path to SurveyValidator modules.

## Redis

### REDIS_CACHE_USER
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Field validators

!!!warning

Field validators feature is now deprecated. Please use [survey validator](survey_validators.md) instead.

Field validators are Python modules that can be added as plugin to perform custom checks on an [operation survey field](../service_catalog/operation.md#survey).

## Create a field validator
Expand Down
89 changes: 89 additions & 0 deletions docs/manual/advanced/survey_validators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Survey validators

Survey validators are Python modules that can be added as plugin. It allows users to implement their own validation
logic on a day1 or day2 operation against the full survey.

## Creating survey validator

Create a Python file in **SURVEY_VALIDATOR_PATH** (default is `plugins/survey_validators`).
Create Python class that inherit from SurveyValidator with a method `validate_survey`.

```python
# plugins/survey_validators/MySurveyValidator.py
from service_catalog.forms import SurveyValidator

class MyCustomValidatorFoo(SurveyValidator):
def validate_survey(self):
# Implement your own logic here
pass
```

## SurveyValidator attributes

### survey

This is a dict containing survey + request_comment. Keys are variable name.
type: dict

```bash
>>> print(self.survey)
{
"request_comment": "commentary sent by user"
"ram_gb": 8,
"vcpu": 4
}
```

### user

User requesting operation.
type: django.contrib.auth.models.User

### operation

Operation requested.
type: service_catalog.models.Operation

### instance

Instance targeted.
type: service_catalog.models.Instance

!!!note
For day 1 operation `self.instance` is a FakeInstance object that contains only **name** and **quota_scope** without `save` method.
The real Instance object is created after validation.

## SurveyValidator method

### validate_survey(self)

Redefine it to implement your own logic.

### fail(self, message, field="\_\_all\_\_")

Raise an exception and display message on UI/API.

## Set validator to a form field

In Squest, edit an Operation to set validators. Multiples validators can be added, validators are executed in alphabetical order by script name and class name.

## Example

This validator will always fail if:

- ram and cpu are both equal 1
- It's not the weekend yet

```python
from service_catalog.forms import SurveyValidator
import datetime

class ValidatorForVM(SurveyValidator):
def validate_survey(self):
if self.survey.get("ram") == 1 and self.survey.get("vcpu") == 1:
self.fail("Forbidden: you cannot use ram=1 and cpu=1")

weekday = datetime.datetime.today().weekday()
if weekday < 5:
self.fail("Sorry it's not the weekend yet")
```
4 changes: 2 additions & 2 deletions docs/manual/service_catalog/survey.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ Full `instance` and `user `object definition can be retrieved through the [API d

## Validators

Field validators are python modules that can be added as plugin to perform a custom check on a form field.
See related [documentation here](../advanced/validators.md).
SurveyValidator are python modules that can be added as plugins to perform a custom check on your form.
See related [documentation here](../advanced/survey_validators.md).


## Attribute definition
Expand Down
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ nav:
- Advanced:
- manual/advanced/filters.md
- manual/advanced/jinja.md
- manual/advanced/validators.md
- manual/advanced/field_validators.md
- manual/advanced/survey_validators.md
- manual/advanced/ldap.md
- Notifications: manual/notifications.md
- Administration:
Expand Down
Empty file.
76 changes: 60 additions & 16 deletions service_catalog/api/serializers/request_serializers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from json import dumps, loads

from rest_framework.exceptions import ValidationError
from rest_framework.exceptions import ValidationError, PermissionDenied
from rest_framework.relations import PrimaryKeyRelatedField
from rest_framework.serializers import ModelSerializer, CharField

from Squest.utils.squest_encoder import SquestEncoder
from profiles.api.serializers.user_serializers import UserSerializerNested
from profiles.models import Scope
from service_catalog.api.serializers import DynamicSurveySerializer, InstanceReadSerializer
from service_catalog.models.instance import Instance
from service_catalog.models import InstanceState, OperationType


from service_catalog.models import Instance, FakeInstance
from service_catalog.models.message import RequestMessage
from service_catalog.models.request import Request

Expand Down Expand Up @@ -41,21 +44,32 @@ def __init__(self, *args, **kwargs):
def validate(self, data):
super(ServiceRequestSerializer, self).validate(data)
quota_scope = data.get("quota_scope")
fill_in_survey = data.get("fill_in_survey")
fill_in_survey = data.get("fill_in_survey", {})
request_comment = data.get("request_comment")
# validate the quota if set on one of the fill_in_survey
if fill_in_survey is not None:
for field_name, value in fill_in_survey.items():
# get the tower field
tower_field = self.operation.tower_survey_fields.get(variable=field_name)
if tower_field.attribute_definition is not None:
# try to find the field in the quota linked to the scope
quota_set_on_attribute = quota_scope.quotas.filter(attribute_definition=tower_field.attribute_definition)
if quota_set_on_attribute.exists():
quota_set_on_attribute = quota_set_on_attribute.first()
if value > quota_set_on_attribute.available:
raise ValidationError({"fill_in_survey":
f"Quota limit reached on '{field_name}'. "
f"Available: {quota_set_on_attribute.available}"})
for field_name, value in fill_in_survey.items():
# get the tower field
tower_field = self.operation.tower_survey_fields.get(variable=field_name)
if tower_field.attribute_definition is not None:
# try to find the field in the quota linked to the scope
quota_set_on_attribute = quota_scope.quotas.filter(
attribute_definition=tower_field.attribute_definition)
if quota_set_on_attribute.exists():
quota_set_on_attribute = quota_set_on_attribute.first()
if value > quota_set_on_attribute.available:
raise ValidationError({"fill_in_survey":
f"Quota limit reached on '{field_name}'. "
f"Available: {quota_set_on_attribute.available}"})
fill_in_survey.update({"request_comment": request_comment})
for validators in self.operation.get_validators():
# load dynamically the user provided validator
validators(
survey=fill_in_survey,
user=self.user,
operation=self.operation,
instance=FakeInstance(quota_scope=quota_scope, name=data.get("squest_instance_name")),
form=None
)._validate()
return data

def save(self):
Expand Down Expand Up @@ -117,6 +131,36 @@ def save(self, **kwargs):
send_mail_request_update(target_request=new_request, user_applied_state=new_request.user, message=message)
return new_request

def validate(self, data):
super(OperationRequestSerializer, self).validate(data)

if self.operation.is_admin_operation and not self.user.has_perm("service_catalog.admin_request_on_instance"):
raise PermissionDenied
if not self.operation.is_admin_operation and not self.user.has_perm("service_catalog.request_on_instance"):
raise PermissionDenied
if self.squest_instance.state not in [InstanceState.AVAILABLE]:
raise PermissionDenied("Instance not available")
if self.operation.enabled is False:
raise PermissionDenied(f"Operation is not enabled.")
if self.operation.service.id != self.squest_instance.service.id:
raise PermissionDenied("Operation service and instance service doesn't match")
if self.operation.type not in [OperationType.UPDATE, OperationType.DELETE]:
raise PermissionDenied("Operation type UPDATE and DELETE only")
fill_in_survey = data.get("fill_in_survey")
request_comment = data.get("request_comment")
fill_in_survey.update({"request_comment": request_comment})

for validators in self.operation.get_validators():
# load dynamically the user provided validator
validators(
survey=fill_in_survey,
user=self.user,
operation=self.operation,
instance=self.squest_instance,
form=None
)._validate()
return data


class RequestSerializer(ModelSerializer):
class Meta:
Expand Down
Loading

0 comments on commit 5a9e3cd

Please sign in to comment.