Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issues with parameter context and allowed cluster specific component access policies #31

Merged
merged 6 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion conf/nifi-cluster-coordinator.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ parameter_contexts:
# Component policy notes:
# - All component policies are overrides
# - Set 'inherited' property to 'true' to delete an override
# - Set 'clusters' array for cluster specific component policies
#-----------------------------------------------------------------
security:
is_coordinated: true
Expand Down Expand Up @@ -174,4 +175,6 @@ security:
- name: 'modify the component'
component_type: 'environment'
component_name: 'foo-2-cluster:production'
inherited: true
inherited: true
clusters:
- 'foo-cluster'
14 changes: 12 additions & 2 deletions nifi_cluster_coordinator/configuration/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ def __init__(self, name: str, action: str, users: list, user_groups: list):

class ComponentAccessPolicy:

def __init__(self, name: str, component_type: str, component_name: str, users: list, user_groups: list, inherited: bool):
def __init__(
self, name: str,
component_type: str,
component_name: str,
users: list,
user_groups: list,
inherited: bool,
clusters: list
):
self.name = name
self.component_type = component_type
self.component_name = component_name
self.users = users if not(users is None) else []
self.user_groups = user_groups if not(user_groups is None) else []
self.inherited = inherited
self.clusters = clusters if not(clusters is None) else []


class Security:
Expand Down Expand Up @@ -76,7 +85,8 @@ def __init__(self, security: dict):
component_name=cap['component_name'],
users=cap['users'] if 'users' in cap else [],
user_groups=cap['user_groups'] if 'user_groups' in cap else [],
inherited=cap['inherited'] if 'inherited' in cap else False)
inherited=cap['inherited'] if 'inherited' in cap else False,
clusters=cap['clusters'] if 'clusters' in cap else [])
for cap in security['component_access_policies']
] if 'component_access_policies' in security and not(security['component_access_policies'] is None) else []

Expand Down
7 changes: 6 additions & 1 deletion nifi_cluster_coordinator/services/access_policy_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ def sync_component_policies(cluster: Cluster, security: Security, configured_pro
configured_access_policies = [
a for a in security.component_access_policies
if a.name.lower() == access_policy_descriptor.name.lower()
and (
len(a.clusters) == 0
or len([c for c in a.clusters if c.lower() == cluster.name.lower()]) > 0
)
]

if access_policy_descriptor.required_by_coordinator:
Expand All @@ -163,7 +167,8 @@ def sync_component_policies(cluster: Cluster, security: Security, configured_pro
component_name='root',
users=[current_user_json['component']['identity']],
user_groups=[],
inherited=False))
inherited=False,
clusters=[]))
for access_policy in configured_access_policies:
if len([u for u in access_policy.users if u.lower() == current_user_json['component']['identity'].lower()]) == 0:
access_policy.users.append(current_user_json['component']['identity'])
Expand Down
56 changes: 33 additions & 23 deletions nifi_cluster_coordinator/services/environment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def sync(cluster: Cluster, project: Project, project_cluster: ProjectCluster, pa

uncoordinated_environments = list(filter(lambda e: not e.is_coordinated, project_cluster.environments))
for environment in uncoordinated_environments:
environment.process_group_id = current_environments_json_dict[environment.name]['id']
if environment.name in current_environments_json_dict:
environment.process_group_id = current_environments_json_dict[environment.name]['id']


def _create(
Expand Down Expand Up @@ -64,16 +65,18 @@ def _create(

if environment.parameter_context_name:
parameter_context = _get_parameter_context_by_name(environment.parameter_context_name, parameter_contexts)
if parameter_context is None or parameter_context.id is None:
if parameter_context is None or (parameter_context.is_coordinated and parameter_context.id is None):
logger.warning(f'Unable to find parameter context: {environment.parameter_context_name}, for project: {project.name}, environment: {environment.name}.')
return
create_json['component']['parameterContext'] = {
'id': parameter_context.id,
'component': {

if not (parameter_context.id is None):
create_json['component']['parameterContext'] = {
'id': parameter_context.id,
'name': environment.parameter_context_name
'component': {
'id': parameter_context.id,
'name': environment.parameter_context_name
}
}
}

try:
response = requests.post(**cluster._get_connection_details(post_url), json=create_json)
Expand Down Expand Up @@ -128,44 +131,52 @@ def _update(
)

):
environment_json['component']['name'] = environment.name
environment_json['component']['comments'] = environment.description

if 'parameterContext' in environment_json['component']:
del environment_json['component']['parameterContext']
update_json = {
'revision': environment_json['revision'],
'component': {
'id': environment_json['id'],
'name': environment.name,
'comments': environment.description,
}
}

if environment.parameter_context_name:
parameter_context = _get_parameter_context_by_name(environment.parameter_context_name, parameter_contexts)
if parameter_context is None or parameter_context.id is None:
if parameter_context is None or (parameter_context.is_coordinated and parameter_context.id is None):
logger.warning(f'Unable to find parameter context: {environment.parameter_context_name}, for project: {project.name}, environment: {environment.name}, in cluster: {cluster.name}.')
return
environment_json['component']['parameterContext'] = {
'id': parameter_context.id,
'component': {

if not (parameter_context.id is None):
update_json['component']['parameterContext'] = {
'id': parameter_context.id,
'name': environment.parameter_context_name
'component': {
'id': parameter_context.id,
'name': environment.parameter_context_name
}
}
}

# remove version control to update
if 'versionControlInformation' in environment_json['component']:
try:
delete_version_url = '/' + url_helper.construct_path_parts(['versions', 'process-groups', environment_json['id']])
response = requests.delete(
**cluster._get_connection_details(delete_version_url),
params={'clientId': environment_json['revision']['clientId'], 'version': str(environment_json['revision']['version'])})
**cluster._get_connection_details(delete_version_url), params=environment_json['revision'])
if response.status_code != 200:
logger.warning(response.text)
return
update_version = True
# update json with new revision
delete_version_response_json = response.json()
if 'processGroupRevision' in delete_version_response_json:
update_json['revision'] = delete_version_response_json['processGroupRevision']
except requests.exceptions.RequestException as exception:
logger.warning(f'Unable to temporarily delete version control from project: {project.name}, environment: {environment.name}, in cluster: {cluster.name}.')
logger.warning(exception)
return

try:
put_url = '/' + url_helper.construct_path_parts(['process-groups', environment_json['id']])
response = requests.put(**cluster._get_connection_details(put_url), json=environment_json)
response = requests.put(**cluster._get_connection_details(put_url), json=update_json)
if response.status_code != 200:
logger.warning(response.text)
return
Expand All @@ -189,8 +200,7 @@ def _delete(cluster: Cluster, project: Project, delete_environment_json):
delete_url = '/' + url_helper.construct_path_parts(['process-groups', delete_environment_json['component']['id']])
try:
response = requests.delete(
**cluster._get_connection_details(delete_url),
params={'clientId': delete_environment_json['revision']['clientId'], 'version': str(delete_environment_json['revision']['version'])})
**cluster._get_connection_details(delete_url), params=delete_environment_json['revision'])
if response.status_code != 200:
logger.warning(response.text)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def sync(cluster: Cluster, configured_parameter_contexts: list):

uncoordinated_parameter_contexts = list(filter(lambda pc: not pc.is_coordinated, configured_parameter_contexts))
for parameter_context in uncoordinated_parameter_contexts:
parameter_context.id = current_parameter_contexts_json_dict[parameter_context.name]['id']
if parameter_context.name in current_parameter_contexts_json_dict:
parameter_context.id = current_parameter_contexts_json_dict[parameter_context.name]['id']


def _create(cluster: Cluster, parameter_context: ParameterContext):
Expand Down