Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic sub-processes for metrics collection #708

Merged
merged 2 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions delfin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@
.DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE,
help='default history(in sec) to be collected on a job '
'reschedule'),
cfg.BoolOpt('enable_dynamic_subprocess',
default=False,
help='Enable dynamic subprocess metrics collection'),
cfg.IntOpt('process_cleanup_interval',
default=60,
help='Background process cleanup call interval in sec'),
cfg.IntOpt('task_cleanup_delay',
default=10,
help='Delay for task cleanup before killing child in sec'),
cfg.IntOpt('group_change_detect_interval',
default=30,
help='Local executor group change detect interval in sec'),
cfg.IntOpt('max_storages_in_child',
default=5,
help='Max storages handled by one local executor process'),
cfg.IntOpt('max_childs_in_node',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this default value ok? It means allowing 100000 process?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we do not restrict number of processes created. Used a large number as default, before raising exception. Also, delete of process, when having no storages to handle takes about 90 seconds. So large number will provide a buffer, if we create and delete storage frequently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThisIsClark This can be customised based on user enthronement based on their deployment configuration. For example this can be set to number of cores available in a node.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if storages is greater than limitations, what will happen

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not restricting storages, we allow storages to get registered.

default=100000,
help='Max processes that can be spawned before forcing fail'),
]

CONF.register_opts(telemetry_opts, "telemetry")
Expand Down
54 changes: 52 additions & 2 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import inspect

import decorator
import tooz

from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
Expand Down Expand Up @@ -334,7 +334,7 @@ def __init__(self):
def join_group(self):
try:
self.coordinator.join_partitioned_group(self.GROUP_NAME)
except tooz.coordination.MemberAlreadyExist:
except coordination.MemberAlreadyExist:
LOG.info('Member %s already in partitioner_group' % CONF.host)

def get_task_executor(self, task_id):
Expand All @@ -350,3 +350,53 @@ def register_watcher_func(self, on_node_join, on_node_leave):

def watch_group_change(self):
self.coordinator.run_watchers()


class GroupMembership(Coordinator):

def __init__(self, agent_id):
super(GroupMembership, self). \
__init__(agent_id=agent_id, prefix="")

def create_group(self, group):
try:
self.coordinator.create_group(group.encode()).get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call get after called create_group

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_group() is async call, following get() ensure that create group is completed

except coordination.GroupAlreadyExist:
LOG.info("Group {0} already exist".format(group))

def delete_group(self, group):
try:
self.coordinator.delete_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info("Group {0} not created".format(group))
except coordination.GroupNotEmpty:
LOG.info("Group {0} not empty".format(group))
except coordination.ToozError:
LOG.info("Group {0} internal error while delete".format(group))

def join_group(self, group):
try:
self.coordinator.join_group(group.encode()).get()
except coordination.MemberAlreadyExist:
LOG.info('Member %s already in group' % group)

def leave_group(self, group):
try:
self.coordinator.leave_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)

def get_members(self, group):
try:
return self.coordinator.get_members(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)

return None

def register_watcher_func(self, group, on_process_join, on_process_leave):
self.coordinator.watch_join_group(group.encode(), on_process_join)
self.coordinator.watch_leave_group(group.encode(), on_process_leave)

def watch_group_change(self):
self.coordinator.run_watchers()
23 changes: 23 additions & 0 deletions delfin/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,29 @@ def start(self):
super(TaskService, self).start()


class MetricsService(Service):
"""Service object for triggering metrics manager functionalities.
"""

@classmethod
def create(cls, host=None, binary=None, topic=None,
manager=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False, *args, **kwargs):
service_obj = super(MetricsService, cls).create(
host=host, binary=binary, topic=topic, manager=manager,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name,
coordination=coordination, *args, **kwargs)

return service_obj

def start(self):
super(MetricsService, self).start()
self.manager.init_scheduler(self.topic, self.host)


class LeaderElectionService(service.Service):
"""Leader election service for distributed system

Expand Down
Loading