Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace WorkQueue replication filter by selector function #12143

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
3. Couchdb replication status (and status of its database)
4. Disk usage status
"""
from __future__ import division
from future.utils import viewitems

import os
import time
import json
import logging
import threading
from pprint import pformat
Expand Down Expand Up @@ -61,6 +62,15 @@ def __init__(self, config):
self.topicAMQ = getattr(config.AgentStatusWatcher, "topicAMQ", None)
self.hostPortAMQ = getattr(config.AgentStatusWatcher, "hostPortAMQ", [('cms-mb.cern.ch', 61313)])

# Load CouchDB replication filters
jsonDir = os.path.dirname(os.path.abspath(__file__))
replicationFile = os.path.join(jsonDir, "replication_selector.json")
if os.path.exists(replicationFile):
with open(replicationFile, 'r') as fd:
self.replicationDict = json.load(fd)
else:
raise RuntimeError(f"Could not find CouchDB replication JSON file at: {replicationFile}")

# T0 doesn't have WorkQueue, so some monitoring/replication code has to be skipped here
if hasattr(self.config, "Tier0Feeder"):
self.isT0agent = True
Expand All @@ -83,33 +93,39 @@ def setUpCouchDBReplication(self):

self.replicatorDocs = []
# set up common replication code
wmstatsSource = self.config.JobStateMachine.jobSummaryDBName
wmstatsTarget = self.config.General.centralWMStatsURL
self.replicatorDocs.append({'source': wmstatsSource, 'target': wmstatsTarget,
'filter': "WMStatsAgent/repfilter"})
self.replicatorDocs.append({'source': self.config.JobStateMachine.jobSummaryDBName,
'target': self.config.General.centralWMStatsURL,
'selector': self.replicationDict["WMStatsAgent/repfilter"]})

if self.isT0agent:
t0Source = self.config.Tier0Feeder.requestDBName
t0Target = self.config.AnalyticsDataCollector.centralRequestDBURL
self.replicatorDocs.append({'source': t0Source, 'target': t0Target,
'filter': "T0Request/repfilter"})
# Tier0 specific replication
self.replicatorDocs.append({'source': self.config.Tier0Feeder.requestDBName,
'target': self.config.AnalyticsDataCollector.centralRequestDBURL,
'selector': self.replicationDict["T0Request/repfilter"]})
else:
# set up workqueue replication
wqfilter = 'WorkQueue/queueFilter'
parentQURL = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
childURL = self.config.WorkQueueManager.queueParams["QueueURL"]
query_params = {'childUrl': childURL, 'parentUrl': sanitizeURL(parentQURL)['url']}
# Production specific workqueue replication
parentQueueUrl = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
childQueueUrl = self.config.WorkQueueManager.queueParams["QueueURL"]
localQInboxURL = "%s_inbox" % self.config.AnalyticsDataCollector.localQueueURL
self.replicatorDocs.append({'source': sanitizeURL(parentQURL)['url'], 'target': localQInboxURL,
'filter': wqfilter, 'query_params': query_params})
self.replicatorDocs.append({'source': localQInboxURL, 'target': parentQURL,
'filter': wqfilter, 'query_params': query_params})
# Update the selector filter
workqueueEscapedKey = "WMCore\.WorkQueue\.DataStructs\.WorkQueueElement\.WorkQueueElement"
self.replicationDict['WorkQueue/queueFilter'][workqueueEscapedKey]["ParentQueueUrl"] = parentQueueUrl
self.replicationDict['WorkQueue/queueFilter'][workqueueEscapedKey]["ChildQueueUrl"] = childQueueUrl
self.replicatorDocs.append({'source': parentQueueUrl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if replicator docs already have such source or target? is it better to check it first. I know that CouchDB will create a revision, but may be you don't want that. It depends on logic of handling multiple versions of the same document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replication documents are deleted before new replications are defined (see the beginning of this method).

'target': localQInboxURL,
'selector': self.replicationDict['WorkQueue/queueFilter']})
self.replicatorDocs.append({'source': localQInboxURL,
'target': parentQueueUrl,
'selector': self.replicationDict['WorkQueue/queueFilter']})

logging.info("Going to create %d new replication documents", len(self.replicatorDocs))
for rp in self.replicatorDocs:
# ensure credentials don't get exposed in the logs
msg = f"Creating continuous replication for source: {sanitizeURL(rp['source'])['url']}, "
msg += f"target: {sanitizeURL(rp['target'])['url']} and selector filter: {rp['selector']}"
logging.info(msg)
resp = self.localCouchMonitor.couchServer.replicate(rp['source'], rp['target'],
continuous=True,
filter=rp['filter'],
query_params=rp.get('query_params', False))
selector=rp.get('selector', False))
logging.info(".. response for the replication document creation was: %s", resp)

def setup(self, parameters):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

{
"_comment": "This corresponds to a set of selector filter functions for CouchDB database replication",
"WMStatsAgent/repfilter": {
"_deleted": {"$exists": false},
"_id": {"$regex": "^(?!_design/)"}
},
"T0Request/repfilter": {
"_id": {"$regex": "^(?!_design/)"}
},
"WorkQueue/queueFilter": {
"_deleted": {"$exists": false},
"type": "WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement",
"WMCore\\.WorkQueue\\.DataStructs\\.WorkQueueElement\\.WorkQueueElement":
{"ParentQueueUrl": "config.WorkQueueManager.queueParams['ParentQueueCouchUrl']",
"ChildQueueUrl": "config.WorkQueueManager.queueParams['QueueURL']"}
}
}
5 changes: 4 additions & 1 deletion src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ def connectDatabase(self, dbname='database', create=True, size=1000):

def replicate(self, source, destination, continuous=False,
create_target=False, cancel=False, doc_ids=False,
filter=False, query_params=False, sleepSecs=0):
filter=False, query_params=False, sleepSecs=0, selector=False):
"""
Trigger replication between source and destination. CouchDB options are
defined in: https://docs.couchdb.org/en/3.1.2/api/server/common.html#replicate
Expand All @@ -1039,6 +1039,7 @@ def replicate(self, source, destination, continuous=False,
this filter is expected to have been defined in the design doc.
:param query_params: dictionary of parameters to pass over to the filter function
:param sleepSecs: amount of seconds to sleep after the replication job is created
:param selector: a new'ish feature for filter functions in Erlang
:return: status of the replication creation
"""
listDbs = self.listDatabases()
Expand All @@ -1064,6 +1065,8 @@ def replicate(self, source, destination, continuous=False,
data["filter"] = filter
if query_params:
data["query_params"] = query_params
if selector: data["selector"] = selector

resp = self.post('/_replicator', data)
# Sleep required for CouchDB 3.x unit tests
time.sleep(sleepSecs)
Expand Down