Skip to content

Commit

Permalink
Merge pull request #12143 from amaltaro/fix-11192
Browse files Browse the repository at this point in the history
Replace WorkQueue replication filter by selector function
  • Loading branch information
amaltaro authored Oct 17, 2024
2 parents 64c182f + 92cc8de commit f34b241
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 22 deletions.
58 changes: 37 additions & 21 deletions src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
3. Couchdb replication status (and status of its database)
4. Disk usage status
"""
from __future__ import division
from future.utils import viewitems

import os
import time
import json
import logging
import threading
from pprint import pformat
Expand Down Expand Up @@ -61,6 +62,15 @@ def __init__(self, config):
self.topicAMQ = getattr(config.AgentStatusWatcher, "topicAMQ", None)
self.hostPortAMQ = getattr(config.AgentStatusWatcher, "hostPortAMQ", [('cms-mb.cern.ch', 61313)])

# Load CouchDB replication filters
jsonDir = os.path.dirname(os.path.abspath(__file__))
replicationFile = os.path.join(jsonDir, "replication_selector.json")
if os.path.exists(replicationFile):
with open(replicationFile, 'r') as fd:
self.replicationDict = json.load(fd)
else:
raise RuntimeError(f"Could not find CouchDB replication JSON file at: {replicationFile}")

# T0 doesn't have WorkQueue, so some monitoring/replication code has to be skipped here
if hasattr(self.config, "Tier0Feeder"):
self.isT0agent = True
Expand All @@ -83,33 +93,39 @@ def setUpCouchDBReplication(self):

self.replicatorDocs = []
# set up common replication code
wmstatsSource = self.config.JobStateMachine.jobSummaryDBName
wmstatsTarget = self.config.General.centralWMStatsURL
self.replicatorDocs.append({'source': wmstatsSource, 'target': wmstatsTarget,
'filter': "WMStatsAgent/repfilter"})
self.replicatorDocs.append({'source': self.config.JobStateMachine.jobSummaryDBName,
'target': self.config.General.centralWMStatsURL,
'selector': self.replicationDict["WMStatsAgent/repfilter"]})

if self.isT0agent:
t0Source = self.config.Tier0Feeder.requestDBName
t0Target = self.config.AnalyticsDataCollector.centralRequestDBURL
self.replicatorDocs.append({'source': t0Source, 'target': t0Target,
'filter': "T0Request/repfilter"})
# Tier0 specific replication
self.replicatorDocs.append({'source': self.config.Tier0Feeder.requestDBName,
'target': self.config.AnalyticsDataCollector.centralRequestDBURL,
'selector': self.replicationDict["T0Request/repfilter"]})
else:
# set up workqueue replication
wqfilter = 'WorkQueue/queueFilter'
parentQURL = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
childURL = self.config.WorkQueueManager.queueParams["QueueURL"]
query_params = {'childUrl': childURL, 'parentUrl': sanitizeURL(parentQURL)['url']}
# Production specific workqueue replication
parentQueueUrl = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
childQueueUrl = self.config.WorkQueueManager.queueParams["QueueURL"]
localQInboxURL = "%s_inbox" % self.config.AnalyticsDataCollector.localQueueURL
self.replicatorDocs.append({'source': sanitizeURL(parentQURL)['url'], 'target': localQInboxURL,
'filter': wqfilter, 'query_params': query_params})
self.replicatorDocs.append({'source': localQInboxURL, 'target': parentQURL,
'filter': wqfilter, 'query_params': query_params})
# Update the selector filter
workqueueEscapedKey = "WMCore\.WorkQueue\.DataStructs\.WorkQueueElement\.WorkQueueElement"
self.replicationDict['WorkQueue/queueFilter'][workqueueEscapedKey]["ParentQueueUrl"] = parentQueueUrl
self.replicationDict['WorkQueue/queueFilter'][workqueueEscapedKey]["ChildQueueUrl"] = childQueueUrl
self.replicatorDocs.append({'source': parentQueueUrl,
'target': localQInboxURL,
'selector': self.replicationDict['WorkQueue/queueFilter']})
self.replicatorDocs.append({'source': localQInboxURL,
'target': parentQueueUrl,
'selector': self.replicationDict['WorkQueue/queueFilter']})

logging.info("Going to create %d new replication documents", len(self.replicatorDocs))
for rp in self.replicatorDocs:
# ensure credentials don't get exposed in the logs
msg = f"Creating continuous replication for source: {sanitizeURL(rp['source'])['url']}, "
msg += f"target: {sanitizeURL(rp['target'])['url']} and selector filter: {rp['selector']}"
logging.info(msg)
resp = self.localCouchMonitor.couchServer.replicate(rp['source'], rp['target'],
continuous=True,
filter=rp['filter'],
query_params=rp.get('query_params', False))
selector=rp.get('selector', False))
logging.info(".. response for the replication document creation was: %s", resp)

def setup(self, parameters):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

{
"_comment": "This corresponds to a set of selector filter functions for CouchDB database replication",
"WMStatsAgent/repfilter": {
"_deleted": {"$exists": false},
"_id": {"$regex": "^(?!_design/)"}
},
"T0Request/repfilter": {
"_id": {"$regex": "^(?!_design/)"}
},
"WorkQueue/queueFilter": {
"_deleted": {"$exists": false},
"type": "WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement",
"WMCore\\.WorkQueue\\.DataStructs\\.WorkQueueElement\\.WorkQueueElement":
{"ParentQueueUrl": "config.WorkQueueManager.queueParams['ParentQueueCouchUrl']",
"ChildQueueUrl": "config.WorkQueueManager.queueParams['QueueURL']"}
}
}
5 changes: 4 additions & 1 deletion src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ def connectDatabase(self, dbname='database', create=True, size=1000):

def replicate(self, source, destination, continuous=False,
create_target=False, cancel=False, doc_ids=False,
filter=False, query_params=False, sleepSecs=0):
filter=False, query_params=False, sleepSecs=0, selector=False):
"""
Trigger replication between source and destination. CouchDB options are
defined in: https://docs.couchdb.org/en/3.1.2/api/server/common.html#replicate
Expand All @@ -1039,6 +1039,7 @@ def replicate(self, source, destination, continuous=False,
this filter is expected to have been defined in the design doc.
:param query_params: dictionary of parameters to pass over to the filter function
:param sleepSecs: amount of seconds to sleep after the replication job is created
:param selector: a new'ish feature for filter functions in Erlang
:return: status of the replication creation
"""
listDbs = self.listDatabases()
Expand All @@ -1064,6 +1065,8 @@ def replicate(self, source, destination, continuous=False,
data["filter"] = filter
if query_params:
data["query_params"] = query_params
if selector: data["selector"] = selector

resp = self.post('/_replicator', data)
# Sleep required for CouchDB 3.x unit tests
time.sleep(sleepSecs)
Expand Down

0 comments on commit f34b241

Please sign in to comment.