Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-v committed Sep 23, 2021
1 parent b52b5e3 commit 81eafe9
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 125 deletions.
4 changes: 2 additions & 2 deletions delfin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@
help='Delay for task cleanup before killing child in sec'),
cfg.IntOpt('group_change_detect_interval',
default=30,
help='Tooz group change detect interval in sec'),
help='Local executor group change detect interval in sec'),
cfg.IntOpt('max_storages_in_child',
default=5,
help='Max storages handled by one child process'),
help='Max storages handled by one local executor process'),
cfg.IntOpt('max_childs_in_node',
default=100000,
help='Max processes that can be spawned before forcing fail'),
Expand Down
5 changes: 0 additions & 5 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,12 @@ def __init__(self, agent_id):
__init__(agent_id=agent_id, prefix="")

def create_group(self, group):
# Create the group
try:
self.coordinator.create_group(group.encode()).get()
except coordination.GroupAlreadyExist:
LOG.info("GROUP {0} already exist".format(group))

def delete_group(self, group):
# Create the group
try:
self.coordinator.delete_group(group.encode()).get()
except coordination.GroupNotCreated:
Expand All @@ -378,21 +376,18 @@ def delete_group(self, group):

def join_group(self, group):
try:
# Join the group
self.coordinator.join_group(group.encode()).get()
except coordination.MemberAlreadyExist:
LOG.info('Member %s already in group' % group)

def leave_group(self, group):
try:
# Join the group
self.coordinator.leave_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)

def get_members(self, group):
try:
# Join the group
return self.coordinator.get_members(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)
Expand Down
184 changes: 73 additions & 111 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from oslo_service import service as oslo_ser

from delfin import db
from delfin import exception
from delfin import manager
from delfin.coordination import ConsistentHashing, GroupMembership
from delfin import context as ctxt
Expand All @@ -34,7 +35,6 @@
import FailedJobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import JobHandler
from delfin.task_manager.tasks import telemetry

LOG = log.getLogger(__name__)
CONF = cfg.CONF
Expand All @@ -46,7 +46,6 @@ class MetricsTaskManager(manager.Manager):
RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
self.telemetry_task = telemetry.TelemetryTask()
super(MetricsTaskManager, self).__init__(*args, **kwargs)
scheduler = schedule_manager.SchedulerManager()
scheduler.start()
Expand All @@ -72,8 +71,8 @@ def assign_job(self, context, task_id, executor):
instance.schedule_job(task_id)
else:
if not self.watch_job_id:
self._init_watchers(executor)
local_executor = self._get_local_executor(
self.init_watchers(executor)
local_executor = self.get_local_executor(
context, task_id, None, executor)
self.rpcapi.assign_job_local(context, task_id, local_executor)

Expand All @@ -82,25 +81,26 @@ def remove_job(self, context, task_id, executor):
instance = JobHandler.get_instance(context, task_id)
instance.remove_job(task_id)
else:
job = db.task_get(context, task_id)
storage_id = job['storage_id']
for name in self.executor_map.keys():
for storage in self.executor_map[name]["storages"]:
if task_id in storage["task_ids"]:
local_executor = "{0}:{1}".format(executor, name)
self.rpcapi.remove_job_local(
context, task_id, local_executor)
storage["task_ids"].remove(task_id)
return self._check_and_stop_executor(
name, local_executor, storage)
if storage_id in self.executor_map[name]["storages"]:
local_executor = "{0}:{1}".format(executor, name)
self.rpcapi.remove_job_local(
context, task_id, local_executor)
tasks, failed_tasks = self.get_all_tasks(storage_id)
if len(failed_tasks) == 0 and len(tasks) == 0:
self.stop_executor(name, local_executor, storage_id)

def assign_failed_job(self, context, failed_task_id, executor):
if not self.enable_sub_process:
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.schedule_failed_job(failed_task_id)
else:
if not self.watch_job_id:
self._init_watchers(executor)
self.init_watchers(executor)

local_executor = self._get_local_executor(
local_executor = self.get_local_executor(
context, None, failed_task_id, executor)
self.rpcapi.assign_failed_job_local(
context, failed_task_id, local_executor)
Expand All @@ -110,15 +110,16 @@ def remove_failed_job(self, context, failed_task_id, executor):
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.remove_failed_job(failed_task_id)
else:
job = db.failed_task_get(context, failed_task_id)
storage_id = job['storage_id']
for name in self.executor_map.keys():
for storage in self.executor_map[name]["storages"]:
if failed_task_id in storage["failed_task_ids"]:
local_executor = "{0}:{1}".format(executor, name)
self.rpcapi.remove_failed_job_local(
context, failed_task_id, local_executor)
storage["failed_task_ids"].remove(failed_task_id)
return self._check_and_stop_executor(
name, local_executor, storage)
if storage_id in self.executor_map[name]["storages"]:
local_executor = "{0}:{1}".format(executor, name)
self.rpcapi.remove_failed_job_local(
context, failed_task_id, local_executor)
tasks, failed_tasks = self.get_all_tasks(storage_id)
if len(failed_tasks) == 0 and len(tasks) == 0:
self.stop_executor(name, local_executor, storage_id)

def schedule_boot_jobs(self, executor):
"""Schedule periodic collection if any task is currently assigned to
Expand Down Expand Up @@ -147,7 +148,7 @@ def schedule_boot_jobs(self, executor):
else:
LOG.debug("Boot job scheduling completed.")

def _init_watchers(self, group):
def init_watchers(self, group):
watcher = GroupMembership(agent_id=group)
watcher.start()
watcher.create_group(group)
Expand Down Expand Up @@ -193,21 +194,23 @@ def on_process_leave(self, event):
host = event.group_id.decode('utf-8')
LOG.info("Re-create process {0} in {1} that is handling tasks"
.format(executor_topic, host))
launcher = self._create_process(executor_topic, host)
launcher = self.create_process(executor_topic, host)
self.executor_map[name]["launcher"] = launcher
context = ctxt.get_admin_context()
for storage in self.executor_map[name]["storages"]:
for task_id in storage["task_ids"]:
for storage_id in self.executor_map[name]["storages"]:
tasks, failed_tasks = self.get_all_tasks(storage_id)
for task in tasks:
LOG.info("Re-scheduling task {0} of storage {1}"
.format(task_id, storage["storage_id"]))
.format(task['id'], storage_id))
self.rpcapi.assign_job_local(
context, task_id, executor_topic)
context, task['id'], executor_topic)

for f_task_id in storage["failed_task_ids"]:
LOG.info("Re-scheduling failed task {0} of storage {1}"
.format(f_task_id, storage["storage_id"]))
for f_task in failed_tasks:
LOG.info("Re-scheduling failed failed task {0},"
" of storage {1}"
.format(f_task['id'], storage_id))
self.rpcapi.assign_failed_job_local(
context, f_task_id, executor_topic)
context, f_task['id'], executor_topic)

def process_cleanup(self):
LOG.info('Periodic process cleanup called')
Expand All @@ -232,21 +235,7 @@ def process_cleanup(self):
self.executor_map[name]["launcher"].stop()
self.executor_map.pop(name)

# Update executer map with deleted task status
context = ctxt.get_admin_context()
for name in executor_names:
for storage in self.executor_map[name]["storages"]:
filters = {'storage_id': storage['storage_id'], 'deleted': True}
deleted_tasks = db.task_get_all(context, filters=filters)
deleted_f_tasks = db.failed_task_get_all(context, filters=filters)
for task_id in deleted_tasks:
if task_id in storage["task_ids"]:
storage["task_ids"].remove(task_id)
for f_task_id in deleted_f_tasks:
if f_task_id in storage["task_ids"]:
storage["failed_task_ids"].remove(f_task_id)

def _create_process(self, topic=None, host=None):
def create_process(self, topic=None, host=None):
metrics_task_server = service. \
MetricsService.create(binary='delfin-task',
topic=topic,
Expand All @@ -260,56 +249,33 @@ def _create_process(self, topic=None, host=None):
launcher.launch_service(metrics_task_server, workers=1)
return launcher

def _get_local_executor(self, context, task_id, failed_task_id, executor):
def get_local_executor(self, context, task_id, failed_task_id, executor):
executor_names = self.executor_map.keys()
task = task_id if task_id else failed_task_id
job = db.task_get(context, task)
storage_id = job['storage_id']
storage_id = None
if task_id:
job = db.task_get(context, task_id)
storage_id = job['storage_id']
elif failed_task_id:
job = db.failed_task_get(context, failed_task_id)
storage_id = job['storage_id']
else:
raise exception.InvalidInput("Missing task id")

# Task already exists
# Storage already exists
for name in executor_names:
executor_topic = "{0}:{1}".format(executor, name)
for storage_exec in self.executor_map[name]["storages"]:
if storage_exec["storage_id"] == storage_id:
storage = storage_exec
if task_id:
if task_id not in storage["task_ids"]:
storage["task_ids"].append(task_id)
if failed_task_id:
if failed_task_id not in storage_exec["failed_task_ids"]:
storage["failed_task_ids"].append(failed_task_id)
return executor_topic
if storage_id in self.executor_map[name]["storages"]:
return executor_topic

# Return existing executor_topic
for name in executor_names:
no_of_storages = len(self.executor_map[name]["storages"])
if no_of_storages and (no_of_storages <
CONF.telemetry.max_storages_in_child):
executor_topic = "{0}:{1}".format(executor, name)
LOG.info("Selecting existing local executor {0} for task {1}"
.format(executor_topic, task))
storage = None
for storage_exec in self.executor_map[name]["storages"]:
if storage_exec["storage_id"] == storage_id:
LOG.info("The storage {0} for task {1} in executor"
.format(storage_id, task))
storage = storage_exec
break
if not storage:
LOG.info("The storage {0} for task {1} not in executor."
" Creating the storage entry in executor."
.format(storage_id, task))
storage = {
"storage_id": storage_id,
"task_ids": [],
"failed_task_ids": []
}
self.executor_map[name]["storages"].append(storage)

if task_id:
storage["task_ids"].append(task_id)
if failed_task_id:
storage["failed_task_ids"].append(failed_task_id)
LOG.info("Selecting existing local executor {0} for {1}"
.format(executor_topic, storage_id))
self.executor_map[name]["storages"].append(storage_id)
return executor_topic

# Return executor_topic after creating one
Expand All @@ -318,41 +284,36 @@ def _get_local_executor(self, context, task_id, failed_task_id, executor):
if name not in executor_names:
executor_topic = "{0}:{1}".format(executor, name)
LOG.info("Create a new local executor {0} for {1}"
.format(executor_topic, task))
launcher = self._create_process(
.format(executor_topic, storage_id))
launcher = self.create_process(
topic=executor_topic, host=executor)
self.executor_map[name] = {
"storages": [],
"launcher": None,
"storages": [storage_id],
"launcher": launcher,
"cleanup_delay": 0
}
storage = {
"storage_id": storage_id,
"task_ids": [],
"failed_task_ids": []
}
if task_id:
storage["task_ids"].append(task_id)
if failed_task_id:
storage["failed_task_ids"].append(failed_task_id)
self.executor_map[name]["storages"].append(storage)
self.executor_map[name]["launcher"] = launcher
return executor_topic

msg = "Reached maximum number of ({0}) local executors". \
format(CONF.telemetry.max_childs_in_node)
LOG.error(msg)
raise RuntimeError(msg)

def _check_and_stop_executor(self, name, local_executor, storage):
tasks_count = len(storage["task_ids"])
failed_tasks_count = len(storage["failed_task_ids"])
if tasks_count == 0 and failed_tasks_count == 0:
LOG.info("Stop and remove local executor {0}"
.format(local_executor))
self.executor_map[name]["storages"].remove(storage)
self.executor_map[name]["cleanup_delay"] = \
CONF.telemetry.task_cleanup_delay
def get_all_tasks(self, storage_id):
filters = {'storage_id': storage_id,
'deleted': False}
context = ctxt.get_admin_context()
tasks = db.task_get_all(context, filters=filters)
failed_tasks = db.failed_task_get_all(context, filters=filters)
return tasks, failed_tasks

def stop_executor(self, name, local_executor, storage_id):
LOG.info("Stop and remove local executor {0}"
.format(local_executor))
if storage_id in self.executor_map[name]["storages"]:
self.executor_map[name]["storages"].remove(storage_id)
self.executor_map[name]["cleanup_delay"] = \
CONF.telemetry.task_cleanup_delay

def stop(self):
"""Cleanup periodic jobs"""
Expand All @@ -364,8 +325,9 @@ def stop(self):
self.watcher.delete_group(self.group)
if self.watcher:
self.watcher.stop()
if self.scheduler:
self.scheduler.shutdown()
self.watch_job_id = None
self.cleanup_job_id = None
self.group = None
self.watcher = None
self.scheduler.shutdown()
7 changes: 4 additions & 3 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ def on_node_join(self, event):
if new_executor != origin_executor:
LOG.info('Re-distribute failed_job %s from %s to %s' %
(failed_task['id'], origin_executor, new_executor))
self.task_rpcapi.remove_job(self.ctx, task['id'],
task['executor'])
distributor.distribute_failed_job(failed_task['id'])
self.task_rpcapi.remove_failed_job(
self.ctx, failed_task['id'], failed_task['executor'])
distributor.distribute_failed_job(failed_task['id'],
task['executor'])
partitioner.stop()

def on_node_leave(self, event):
Expand Down
4 changes: 2 additions & 2 deletions delfin/task_manager/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
periodical task manager for metric collection tasks**
Subprocess metrics manager for metric collection tasks**
"""

from oslo_log import log
Expand All @@ -32,7 +32,7 @@


class SubprocessManager(manager.Manager):
"""manage periodical tasks"""
"""manage periodical collection tasks in subprocesses"""

RPC_API_VERSION = '1.0'

Expand Down
Loading

0 comments on commit 81eafe9

Please sign in to comment.