Skip to content

Commit

Permalink
Merge pull request #227 from martynia/devel_janusz_pilot_logs_7411
Browse files Browse the repository at this point in the history
[devel] Remote logging: modify logger buffer size and implement CE white list (address Dirac #7411)
  • Loading branch information
fstagni committed Jan 24, 2024
2 parents 8f2617f + 977b98b commit e0138d6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Pilot/dirac-pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
if not sys.stdin.isatty():
receivedContent = sys.stdin.read()
log = RemoteLogger(
pilotParams.loggerURL, "Pilot", pilotUUID=pilotParams.pilotUUID, debugFlag=pilotParams.debugFlag
pilotParams.loggerURL, "Pilot", bufsize=pilotParams.loggerBufsize, pilotUUID=pilotParams.pilotUUID, debugFlag=pilotParams.debugFlag
)
log.info("Remote logger activated")
log.buffer.write(receivedContent)
Expand Down
23 changes: 20 additions & 3 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def __init__(
isPilotLoggerOn=True,
pilotUUID="unknown",
flushInterval=10,
bufsize=1000,
):
"""
c'tor
Expand All @@ -523,7 +524,7 @@ def __init__(
self.pilotUUID = pilotUUID
self.isPilotLoggerOn = isPilotLoggerOn
sendToURL = partial(sendMessage, url, pilotUUID, "sendMessage")
self.buffer = FixedSizeBuffer(sendToURL, autoflush=flushInterval)
self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval)

def debug(self, msg, header=True, sendPilotLog=False):
super(RemoteLogger, self).debug(msg, header)
Expand Down Expand Up @@ -584,7 +585,7 @@ class FixedSizeBuffer(object):
Once it's full, a message is sent to a remote server and the buffer is renewed.
"""

def __init__(self, senderFunc, bufsize=10, autoflush=10):
def __init__(self, senderFunc, bufsize=1000, autoflush=10):
"""
Constructor.
Expand Down Expand Up @@ -649,7 +650,7 @@ def flush(self):
:return: None
:rtype: None
"""
if not self.output.closed:
if not self.output.closed and self._nlines > 0:
self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
Expand Down Expand Up @@ -723,6 +724,7 @@ def __init__(self, pilotParams, dummy=""):
# URL present and the flag is set:
isPilotLoggerOn = pilotParams.pilotLogging and (loggerURL is not None)
interval = pilotParams.loggerTimerInterval
bufsize = pilotParams.loggerBufsize

if not isPilotLoggerOn:
self.log = Logger(self.__class__.__name__, debugFlag=self.debugFlag)
Expand All @@ -734,6 +736,7 @@ def __init__(self, pilotParams, dummy=""):
pilotUUID=pilotParams.pilotUUID,
debugFlag=self.debugFlag,
flushInterval=interval,
bufsize=bufsize,
)

self.log.isPilotLoggerOn = isPilotLoggerOn
Expand Down Expand Up @@ -919,6 +922,7 @@ def __init__(self):
self.pilotLogging = False
self.loggerURL = None
self.loggerTimerInterval = 0
self.loggerBufsize = 1000
self.pilotUUID = "unknown"
self.modules = "" # see dirac-install "-m" option documentation
self.userEnvVariables = "" # see dirac-install "--userEnvVariables" option documentation
Expand Down Expand Up @@ -1215,13 +1219,26 @@ def __initJSON2(self):
self.loggerURL = pilotOptions.get("RemoteLoggerURL")
# logger buffer flush interval in seconds.
self.loggerTimerInterval = int(pilotOptions.get("RemoteLoggerTimerInterval", self.loggerTimerInterval))
# logger buffer size in lines:
self.loggerBufsize = max(1, int(pilotOptions.get("RemoteLoggerBufsize", self.loggerBufsize)))
# logger CE white list
loggerCEsWhiteList = pilotOptions.get("RemoteLoggerCEsWhiteList")
# restrict remote logging to whitelisted CEs ([] or None => no restriction)
self.log.debug("JSON: Remote logging CE white list: %s" % loggerCEsWhiteList)
if loggerCEsWhiteList is not None:
if not isinstance(loggerCEsWhiteList, list):
loggerCEsWhiteList = [elem.strip() for elem in loggerCEsWhiteList.split(",")]
if self.ceName not in loggerCEsWhiteList:
self.pilotLogging = False
self.log.debug("JSON: Remote logging disabled for this CE: %s" % self.ceName)
pilotLogLevel = pilotOptions.get("PilotLogLevel", "INFO")
if pilotLogLevel.lower() == "debug":
self.debugFlag = True
self.log.debug("JSON: Remote logging: %s" % self.pilotLogging)
self.log.debug("JSON: Remote logging URL: %s" % self.loggerURL)
self.log.debug("JSON: Remote logging buffer flush interval in sec.(0: disabled): %s" % self.loggerTimerInterval)
self.log.debug("JSON: Remote/local logging debug flag: %s" % self.debugFlag)
self.log.debug("JSON: Remote logging buffer size (lines): %s" % self.loggerBufsize)

# CE type if present, then Defaults, otherwise as defined in the code:
if "Commands" in pilotOptions:
Expand Down
1 change: 1 addition & 0 deletions tests/CI/pilot_newSchema.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"GenericPilotGroup": "dteam_pilot",
"GenericPilotDN": "VAR_USERDN",
"RemoteLogging": "True",
"RemoteLoggerCEsWhiteList": "jenkins.cern.ch, jenkins-mp-pool.cern.ch",
"RemoteLoggerURL": "https://lbcertifdirac70.cern.ch:8443/WorkloadManagement/TornadoPilotLogging",
"RemoteLoggerTimerInterval": 0,
"PilotLogLevel": "DEBUG",
Expand Down

0 comments on commit e0138d6

Please sign in to comment.