Skip to content

Commit

Permalink
Common: Replace string formatter with f-strings, format context mess…
Browse files Browse the repository at this point in the history
…age uniformly rucio#129

            Changes to metric names include:
            * undertaker_expired_dids -> expired_dids.total
            * fts3.{hostname}.submitted -> fts_backlog.submitted.{hostname}
            * hermes_queues_messages.queues.messages -> messages_to_submit.queues.messages
            * transmogrifier_new_dids -> new_dids
            * judge_stuck_rules_without_missing_source_replica -> stuck_rules.{source_status} (source_status = [without_missing_source_replica, with_missing_source_replica])
            * check_transfer_queues_status -> transfer_queues_status
            * judge.waiting_dids -> unevaluated_dids
            * reaper.unlocked_replicas -> unlocked_replicas.{replica_status} (replica_status = [expired, unlocked])
            * judge.updated_dids -> updated_dids
  • Loading branch information
voetberg committed Feb 20, 2024
1 parent 072758c commit 7992510
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 84 deletions.
3 changes: 2 additions & 1 deletion common/check_expired_dids
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ if __name__ == "__main__":
# Possible check against a threshold. If result > max_value then sys.exit(CRITICAL)

manager.gauge('expired_dids.total',
documentation="All expired dids").set(result)
documentation="All expired dids"
).set(result)

except:
print(traceback.format_exc())
Expand Down
36 changes: 16 additions & 20 deletions common/check_fts_backlog
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ if __name__ == "__main__":

errmsg = ''
for ftshost in FTSHOSTS.split(','):
print("=== %s ===" % ftshost)
print(f"=== {ftshost} ===")
parsed_url = urlparse(ftshost)
scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port
retvalue = CRITICAL
url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO)
url = f'{ftshost}/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo={VO}'
busy_channels = []
busylimit = 5000
for attempt in range(0, 5):
Expand All @@ -108,7 +108,7 @@ if __name__ == "__main__":
pass

if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit:
url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO, src, dst)
url_activities = f'{ftshost}/fts3/ftsmon/config/activities/{VO}?source_se={src}&dest_se={dst}'
activities = {}
try:
s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY))
Expand All @@ -120,36 +120,33 @@ if __name__ == "__main__":
'activities': activities})
summary = res['summary']
hostname = hostname.replace('.', '_')
# If printing these indiv is important, why not monitor them seperately?
print('%s : Submitted : %s' % (hostname, summary['submitted']))
print('%s : Active : %s' % (hostname, summary['active']))
print('%s : Staging : %s' % (hostname, summary['staging']))
print('%s : Started : %s' % (hostname, summary['started']))

for state in ['submitted', 'active', 'staging', 'started']:
print(f'{hostname} : {state.capitalize()} : {summary[state]}')


if busy_channels != []:
print('Busy channels (>%s submitted):' % busylimit)
print(f'Busy channels (>{busylimit} submitted):')
for bc in busy_channels:
activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()])
print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'],
str(activities_str)))
activities_str = ", ".join([(f"{key}: {val}") for key, val in bc['activities'].items()])
print(f'{bc['src']} to {bc['dst']} : {bc['submitted']} submitted jobs {activities_str}')

# Add to metrics
backlog_count = summary['submitted'] + summary['active'] + summary['staging'] + summary['started']
manager.gauge(
"fts_backlog.submitted.{hostname}",
documentation="All submitted, active, staged, or stated in FTS queue").labels(hostname=hostname).set(backlog_count)
manager.gauge("fts_backlog.submitted.{hostname}",
documentation="All submitted, active, staged, or stated in FTS queue"
).labels(hostname=hostname).set(backlog_count)

retvalue = OK
break
except Exception as error:
retvalue = CRITICAL
if result and result.status_code:
errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % (
ftshost, str(result.status_code), str(error))
errmsg = f'Error when trying to get info from {ftshost} : HTTP status code {result.status_code}. {error}'
else:
errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error))
errmsg = f'Error when trying to get info from {ftshost}. {error}'
if retvalue == CRITICAL:
print("All attempts failed. %s" % errmsg)
print(f"All attempts failed. {errmsg}")
WORST_RETVALUE = max(retvalue, WORST_RETVALUE)


Expand Down Expand Up @@ -179,7 +176,6 @@ if __name__ == "__main__":
except:
sys.exit(WORST_RETVALUE)

# Does this not do the same thing as the query? Why the duplicate?
for source_rse, dest_rse in se_matrix:
for source_rse_id in se_map[source_rse]:
for dest_rse_id in se_map[dest_rse]:
Expand Down
3 changes: 2 additions & 1 deletion common/check_messages_to_submit
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ if __name__ == "__main__":

manager.gauge(
"messages_to_submit.queues.messages",
documentation="Messages in queue, to submit").set(message_count)
documentation="Messages in queue, to submit"
).set(message_count)

if message_count > 100000:
sys.exit(WARNING)
Expand Down
136 changes: 80 additions & 56 deletions common/check_obsolete_replicas
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,77 +8,101 @@
# Authors:
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2015
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2018
# - Maggie Voetberg, <maggiev@fnal.gov>, 2024

'''
Probe to check the backlog of obsolete replicas.
'''

import sys
import traceback
from sqlalchemy.sql import text
from rucio.db.sqla.session import BASE, get_session
from utils.common import PrometheusPusher

from rucio.db.sqla.session import get_session
if BASE.metadata.schema:
schema = BASE.metadata.schema + '.'
else:
schema = ''

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3


if __name__ == "__main__":
try:
SESSION = get_session()
QUERY = '''BEGIN
FOR u in (SELECT
a.rse_id AS rse_id,
NVL(b.files, 0) AS files,
NVL(b.bytes, 0) AS bytes,
SYS_EXTRACT_UTC(localtimestamp) AS updated_at
FROM
(
SELECT
id AS rse_id
FROM
atlas_rucio.rses
WHERE
deleted=0) a
LEFT OUTER JOIN
(
SELECT
/*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */
rse_id,
COUNT(1) AS files,
SUM(bytes) AS bytes
FROM
atlas_rucio.replicas
WHERE
(
CASE
WHEN tombstone IS NOT NULL
THEN rse_id
END) IS NOT NULL
AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS')
GROUP BY
rse_id) b
ON
a.rse_id=b.rse_id)
session = get_session()
with PrometheusPusher() as manager:
query = '''BEGIN
FOR u in (SELECT
a.rse_id AS rse_id,
NVL(b.files, 0) AS files,
NVL(b.bytes, 0) AS bytes,
SYS_EXTRACT_UTC(localtimestamp) AS updated_at
FROM
(
SELECT
id AS rse_id
FROM
{schema}rses
WHERE
deleted=0) a
LEFT OUTER JOIN
(
SELECT
/*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */
rse_id,
COUNT(1) AS files,
SUM(bytes) AS bytes
FROM
{schema}replicas
WHERE
(
CASE
WHEN tombstone IS NOT NULL
THEN rse_id
END) IS NOT NULL
AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS')
GROUP BY
rse_id) b
ON
a.rse_id=b.rse_id)
LOOP
MERGE INTO atlas_rucio.RSE_USAGE
USING DUAL
ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'obsolete')
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at)
WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at;
LOOP
MERGE INTO {schema}RSE_USAGE
USING DUAL
ON ({schema}RSE_USAGE.rse_id = u.rse_id and source = 'obsolete')
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at)
WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at;
MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H
USING DUAL
ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at)
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at);
MERGE INTO {schema}RSE_USAGE_HISTORY H
USING DUAL
ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at)
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at);
COMMIT;
END LOOP;
END;
'''
SESSION.execute(QUERY)
except Exception as error:
print error
COMMIT;
END LOOP;
END;'''.format(schema=schema)

for result in session.execute(text(query)):
print(result)

rse_id = result[0]
bytes_sum = result[2]
files_count = result[3]

manager.gauge(name="obsolete_replicas_files.{rse}",
documentation="Probe to check the backlog of obsolete replicas.").labels(rse=rse_id).set(files_count)

manager.gauge(name="obsolete_replicas_bytes.{rse}",
documentation="Probe to check the backlog of obsolete replicas.").labels().set(bytes_sum)


except:
print(traceback.format_exc())
sys.exit(UNKNOWN)
finally:
session.remove()
sys.exit(OK)
6 changes: 3 additions & 3 deletions common/check_stuck_rules
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ if __name__ == "__main__":
with PrometheusPusher() as manager:
for source_status, query in queries.items():
result = session.execute(sql_text(query)).fetchone()[0]
manager.gauge(
"stuck_rules.{source_status}",
documentation="Backlog of stuck rules").labels(source_status=source_status).set(result)
manager.gauge("stuck_rules.{source_status}",
documentation="Backlog of stuck rules"
).labels(source_status=source_status).set(result)

except:
print(traceback.format_exc())
Expand Down
4 changes: 3 additions & 1 deletion common/check_transfer_queues_status
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ if __name__ == "__main__":
activity = items[3]
external_host = items[4]

manager.gauge("transfer_queues_status.{activity}.{state}.{external_host}").labels(activity=activity, state=state, external_host=external_host).set(count)
manager.gauge(
"transfer_queues_status.{activity}.{state}.{external_host}"
).labels(activity=activity, state=state, external_host=external_host).set(count)

except Exception as e:
print(f"Error: {e}")
Expand Down
4 changes: 2 additions & 2 deletions common/check_unlocked_replicas
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ if __name__ == "__main__":
}

with PrometheusPusher() as manager:
for did_status, query in queries.items():
for replica_status, query in queries.items():
result = session.execute(sql_text(query)).fetchone()[0]
manager.gauge("unlocked_dids.{did_status}").labels(did_status=did_status).set(result)
manager.gauge("unlocked_replicas.{replica_status}").labels(did_status=replica_status).set(result)

except:
sys.exit(UNKNOWN)
Expand Down

0 comments on commit 7992510

Please sign in to comment.