Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Historic performance collection when last runtime not set #765

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from delfin import db
from delfin.common.constants import TelemetryCollection, TelemetryJobStatus
from delfin.exception import TaskNotFound
from delfin.i18n import _
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask
Expand Down Expand Up @@ -49,6 +50,25 @@ def get_instance(ctx, task_id):
return JobHandler(ctx, task_id, task['storage_id'],
task['args'], task['interval'])

def perform_history_collection(self, start_time, end_time, last_run_time):
# Trigger one historic collection to make sure we do not
# miss any Data points due to reschedule
LOG.debug('Triggering one historic collection for task %s',
self.task_id)
try:
telemetry = PerformanceCollectionTask()
ret = telemetry.collect(self.ctx, self.storage_id, self.args,
start_time, end_time)
LOG.debug('Historic collection performed for task %s with '
'result %s' % (self.task_id, ret))
db.task_update(self.ctx, self.task_id,
{'last_run_time': last_run_time})
except Exception as e:
msg = _("Failed to collect performance metrics during history "
"collection for storage id:{0}, reason:{1}"
.format(self.storage_id, six.text_type(e)))
LOG.error(msg)

def schedule_job(self, task_id):

if self.stopped:
Expand Down Expand Up @@ -91,27 +111,31 @@ def schedule_job(self, task_id):
LOG.info('Periodic collection tasks scheduled for for job id: '
'%s ' % self.task_id)

# Check if historic collection is needed for this task.
# If the last run time is already set, adjust start_time based on
# last run time or history_on_reschedule which is smaller
# If jod id is created but last run time is not yet set, then
# adjust start_time based on interval or history_on_reschedule
# whichever is smaller

end_time = current_time * 1000
# Maximum supported history duration on restart
history_on_reschedule = CONF.telemetry. \
performance_history_on_reschedule
if job['last_run_time']:
# Trigger one historic collection to make sure we do not
# miss any Data points due to reschedule
LOG.debug('Triggering one historic collection for job %s',
job['id'])
# Maximum supported history duration on restart
history_on_reschedule = CONF.telemetry. \
performance_history_on_reschedule
# Adjust start_time and end_time based on last_run_time
end_time = current_time * 1000
start_time = job['last_run_time'] * 1000 \
if current_time - job['last_run_time'] < \
history_on_reschedule \
else (end_time - history_on_reschedule * 1000)
telemetry = PerformanceCollectionTask()
telemetry.collect(self.ctx, self.storage_id,
self.args,
start_time, end_time)

db.task_update(self.ctx, self.task_id,
{'last_run_time': last_run_time})
self.perform_history_collection(start_time, end_time,
last_run_time)
elif existing_job_id:
interval_in_sec = job['interval']
start_time = (end_time - interval_in_sec * 1000) \
if interval_in_sec < history_on_reschedule \
else (end_time - history_on_reschedule * 1000)
self.perform_history_collection(start_time, end_time,
last_run_time)
else:
LOG.info('Job already exists with this scheduler')

Expand Down