Skip to content

Commit

Permalink
Replace WorkQueue replication filter by selector function
Browse files Browse the repository at this point in the history
Update parameter names to reflect WQE

Nested workqueue structure

Escape dots in the field name

Escape dots in the right field

Escape dots in AgentStatusPoller as well
  • Loading branch information
amaltaro committed Oct 13, 2024
1 parent b5722fb commit 0996dad
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
8 changes: 8 additions & 0 deletions etc/WMAgentConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@
config.AgentStatusWatcher.userAMQ = "OVERWRITE_BY_SECRETS"
config.AgentStatusWatcher.passAMQ = "OVERWRITE_BY_SECRETS"
config.AgentStatusWatcher.topicAMQ = "OVERWRITE_BY_SECRETS"
# Replication filter setup for WorkQueue. Original javascript implementation in:
# https://github.com/dmwm/WMCore/blob/master/src/couchapps/WorkQueue/filters/queueFilter.js
# NOTE that we need to escape the 'type' field to avoid JSON dot notation interpretation
config.AgentStatusWatcher.workqueueSelector = {'_deleted': {"$exists": False},
'type': "WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement",
'WMCore\.WorkQueue\.DataStructs\.WorkQueueElement\.WorkQueueElement':
{'ParentQueueUrl': config.WorkQueueManager.queueParams["ParentQueueCouchUrl"],
'ChildQueueUrl': config.WorkQueueManager.queueParams["QueueURL"]}}

config.component_("RucioInjector")
config.RucioInjector.namespace = "WMComponent.RucioInjector.RucioInjector"
Expand Down
25 changes: 14 additions & 11 deletions src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def __init__(self, config):
self.topicAMQ = getattr(config.AgentStatusWatcher, "topicAMQ", None)
self.hostPortAMQ = getattr(config.AgentStatusWatcher, "hostPortAMQ", [('cms-mb.cern.ch', 61313)])

# WorkQueue replication
self.workqueueSelector = getattr(config.AgentStatusWatcher, "workqueueSelector", {})

# T0 doesn't have WorkQueue, so some monitoring/replication code has to be skipped here
if hasattr(self.config, "Tier0Feeder"):
self.isT0agent = True
Expand Down Expand Up @@ -94,22 +97,22 @@ def setUpCouchDBReplication(self):
'filter': "T0Request/repfilter"})
else:
# set up workqueue replication
wqfilter = 'WorkQueue/queueFilter'
parentQURL = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
childURL = self.config.WorkQueueManager.queueParams["QueueURL"]
query_params = {'childUrl': childURL, 'parentUrl': sanitizeURL(parentQURL)['url']}
parentQueueUrl = self.workqueueSelector["WMCore\.WorkQueue\.DataStructs\.WorkQueueElement\.WorkQueueElement"]["ParentQueueUrl"]
localQInboxURL = "%s_inbox" % self.config.AnalyticsDataCollector.localQueueURL
self.replicatorDocs.append({'source': sanitizeURL(parentQURL)['url'], 'target': localQInboxURL,
'filter': wqfilter, 'query_params': query_params})
self.replicatorDocs.append({'source': localQInboxURL, 'target': parentQURL,
'filter': wqfilter, 'query_params': query_params})
self.replicatorDocs.append({'source': parentQueueUrl,
'target': localQInboxURL,
'selector': self.workqueueSelector})
self.replicatorDocs.append({'source': localQInboxURL,
'target': parentQueueUrl,
'selector': self.workqueueSelector})

logging.info("Going to create %d new replication documents", len(self.replicatorDocs))
for rp in self.replicatorDocs:
logging.info("Creating replication document: %s", rp)
resp = self.localCouchMonitor.couchServer.replicate(rp['source'], rp['target'],
continuous=True,
filter=rp['filter'],
query_params=rp.get('query_params', False))
filter=rp.get('filter', False),
query_params=rp.get('query_params', False),
selector=rp.get('selector', False),)
logging.info(".. response for the replication document creation was: %s", resp)

def setup(self, parameters):
Expand Down
5 changes: 4 additions & 1 deletion src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ def connectDatabase(self, dbname='database', create=True, size=1000):

def replicate(self, source, destination, continuous=False,
create_target=False, cancel=False, doc_ids=False,
filter=False, query_params=False, sleepSecs=0):
filter=False, query_params=False, sleepSecs=0, selector=False):
"""
Trigger replication between source and destination. CouchDB options are
defined in: https://docs.couchdb.org/en/3.1.2/api/server/common.html#replicate
Expand All @@ -1039,6 +1039,7 @@ def replicate(self, source, destination, continuous=False,
this filter is expected to have been defined in the design doc.
:param query_params: dictionary of parameters to pass over to the filter function
:param sleepSecs: amount of seconds to sleep after the replication job is created
:param selector: a new'ish feature for filter functions in Erlang
:return: status of the replication creation
"""
listDbs = self.listDatabases()
Expand All @@ -1064,6 +1065,8 @@ def replicate(self, source, destination, continuous=False,
data["filter"] = filter
if query_params:
data["query_params"] = query_params
if selector: data["selector"] = selector

resp = self.post('/_replicator', data)
# Sleep required for CouchDB 3.x unit tests
time.sleep(sleepSecs)
Expand Down

0 comments on commit 0996dad

Please sign in to comment.