Skip to content

Commit

Permalink
Fixed issues with parameter context and allowed cluster specific comp…
Browse files Browse the repository at this point in the history
…onent access policies (#31)

* Add component exists check for uncoordinated compnents
* Remove client id from revision property when updating or deleting process groups
* Revert "Remove client id from revision property when updating or deleting process groups"
* Override parameter contexts are now applied correctly
* Refactor override parameter context fix
* Allow cluster specific component access policies (#30)
  • Loading branch information
gsingh395 authored Nov 18, 2020
1 parent 550cd33 commit 98c7e19
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 28 deletions.
5 changes: 4 additions & 1 deletion conf/nifi-cluster-coordinator.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ parameter_contexts:
# Component policy notes:
# - All component policies are overrides
# - Set 'inherited' property to 'true' to delete an override
# - Set 'clusters' array for cluster specific component policies
#-----------------------------------------------------------------
security:
is_coordinated: true
Expand Down Expand Up @@ -174,4 +175,6 @@ security:
- name: 'modify the component'
component_type: 'environment'
component_name: 'foo-2-cluster:production'
inherited: true
inherited: true
clusters:
- 'foo-cluster'
14 changes: 12 additions & 2 deletions nifi_cluster_coordinator/configuration/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ def __init__(self, name: str, action: str, users: list, user_groups: list):

class ComponentAccessPolicy:

def __init__(self, name: str, component_type: str, component_name: str, users: list, user_groups: list, inherited: bool):
def __init__(
self, name: str,
component_type: str,
component_name: str,
users: list,
user_groups: list,
inherited: bool,
clusters: list
):
self.name = name
self.component_type = component_type
self.component_name = component_name
self.users = users if not(users is None) else []
self.user_groups = user_groups if not(user_groups is None) else []
self.inherited = inherited
self.clusters = clusters if not(clusters is None) else []


class Security:
Expand Down Expand Up @@ -76,7 +85,8 @@ def __init__(self, security: dict):
component_name=cap['component_name'],
users=cap['users'] if 'users' in cap else [],
user_groups=cap['user_groups'] if 'user_groups' in cap else [],
inherited=cap['inherited'] if 'inherited' in cap else False)
inherited=cap['inherited'] if 'inherited' in cap else False,
clusters=cap['clusters'] if 'clusters' in cap else [])
for cap in security['component_access_policies']
] if 'component_access_policies' in security and not(security['component_access_policies'] is None) else []

Expand Down
7 changes: 6 additions & 1 deletion nifi_cluster_coordinator/services/access_policy_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ def sync_component_policies(cluster: Cluster, security: Security, configured_pro
configured_access_policies = [
a for a in security.component_access_policies
if a.name.lower() == access_policy_descriptor.name.lower()
and (
len(a.clusters) == 0
or len([c for c in a.clusters if c.lower() == cluster.name.lower()]) > 0
)
]

if access_policy_descriptor.required_by_coordinator:
Expand All @@ -163,7 +167,8 @@ def sync_component_policies(cluster: Cluster, security: Security, configured_pro
component_name='root',
users=[current_user_json['component']['identity']],
user_groups=[],
inherited=False))
inherited=False,
clusters=[]))
for access_policy in configured_access_policies:
if len([u for u in access_policy.users if u.lower() == current_user_json['component']['identity'].lower()]) == 0:
access_policy.users.append(current_user_json['component']['identity'])
Expand Down
56 changes: 33 additions & 23 deletions nifi_cluster_coordinator/services/environment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def sync(cluster: Cluster, project: Project, project_cluster: ProjectCluster, pa

uncoordinated_environments = list(filter(lambda e: not e.is_coordinated, project_cluster.environments))
for environment in uncoordinated_environments:
environment.process_group_id = current_environments_json_dict[environment.name]['id']
if environment.name in current_environments_json_dict:
environment.process_group_id = current_environments_json_dict[environment.name]['id']


def _create(
Expand Down Expand Up @@ -64,16 +65,18 @@ def _create(

if environment.parameter_context_name:
parameter_context = _get_parameter_context_by_name(environment.parameter_context_name, parameter_contexts)
if parameter_context is None or parameter_context.id is None:
if parameter_context is None or (parameter_context.is_coordinated and parameter_context.id is None):
logger.warning(f'Unable to find parameter context: {environment.parameter_context_name}, for project: {project.name}, environment: {environment.name}.')
return
create_json['component']['parameterContext'] = {
'id': parameter_context.id,
'component': {

if not (parameter_context.id is None):
create_json['component']['parameterContext'] = {
'id': parameter_context.id,
'name': environment.parameter_context_name
'component': {
'id': parameter_context.id,
'name': environment.parameter_context_name
}
}
}

try:
response = requests.post(**cluster._get_connection_details(post_url), json=create_json)
Expand Down Expand Up @@ -128,44 +131,52 @@ def _update(
)

):
environment_json['component']['name'] = environment.name
environment_json['component']['comments'] = environment.description

if 'parameterContext' in environment_json['component']:
del environment_json['component']['parameterContext']
update_json = {
'revision': environment_json['revision'],
'component': {
'id': environment_json['id'],
'name': environment.name,
'comments': environment.description,
}
}

if environment.parameter_context_name:
parameter_context = _get_parameter_context_by_name(environment.parameter_context_name, parameter_contexts)
if parameter_context is None or parameter_context.id is None:
if parameter_context is None or (parameter_context.is_coordinated and parameter_context.id is None):
logger.warning(f'Unable to find parameter context: {environment.parameter_context_name}, for project: {project.name}, environment: {environment.name}, in cluster: {cluster.name}.')
return
environment_json['component']['parameterContext'] = {
'id': parameter_context.id,
'component': {

if not (parameter_context.id is None):
update_json['component']['parameterContext'] = {
'id': parameter_context.id,
'name': environment.parameter_context_name
'component': {
'id': parameter_context.id,
'name': environment.parameter_context_name
}
}
}

# remove version control to update
if 'versionControlInformation' in environment_json['component']:
try:
delete_version_url = '/' + url_helper.construct_path_parts(['versions', 'process-groups', environment_json['id']])
response = requests.delete(
**cluster._get_connection_details(delete_version_url),
params={'clientId': environment_json['revision']['clientId'], 'version': str(environment_json['revision']['version'])})
**cluster._get_connection_details(delete_version_url), params=environment_json['revision'])
if response.status_code != 200:
logger.warning(response.text)
return
update_version = True
# update json with new revision
delete_version_response_json = response.json()
if 'processGroupRevision' in delete_version_response_json:
update_json['revision'] = delete_version_response_json['processGroupRevision']
except requests.exceptions.RequestException as exception:
logger.warning(f'Unable to temporarily delete version control from project: {project.name}, environment: {environment.name}, in cluster: {cluster.name}.')
logger.warning(exception)
return

try:
put_url = '/' + url_helper.construct_path_parts(['process-groups', environment_json['id']])
response = requests.put(**cluster._get_connection_details(put_url), json=environment_json)
response = requests.put(**cluster._get_connection_details(put_url), json=update_json)
if response.status_code != 200:
logger.warning(response.text)
return
Expand All @@ -189,8 +200,7 @@ def _delete(cluster: Cluster, project: Project, delete_environment_json):
delete_url = '/' + url_helper.construct_path_parts(['process-groups', delete_environment_json['component']['id']])
try:
response = requests.delete(
**cluster._get_connection_details(delete_url),
params={'clientId': delete_environment_json['revision']['clientId'], 'version': str(delete_environment_json['revision']['version'])})
**cluster._get_connection_details(delete_url), params=delete_environment_json['revision'])
if response.status_code != 200:
logger.warning(response.text)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def sync(cluster: Cluster, configured_parameter_contexts: list):

uncoordinated_parameter_contexts = list(filter(lambda pc: not pc.is_coordinated, configured_parameter_contexts))
for parameter_context in uncoordinated_parameter_contexts:
parameter_context.id = current_parameter_contexts_json_dict[parameter_context.name]['id']
if parameter_context.name in current_parameter_contexts_json_dict:
parameter_context.id = current_parameter_contexts_json_dict[parameter_context.name]['id']


def _create(cluster: Cluster, parameter_context: ParameterContext):
Expand Down

0 comments on commit 98c7e19

Please sign in to comment.