Skip to content

Commit

Permalink
Merge pull request DIRACGrid#217 from martynia/devel_janusz_pilot_log…
Browse files Browse the repository at this point in the history
…sWrapperJSON_sync_timer.dev

[devel] feat: add log flushing with a timer.
  • Loading branch information
fstagni authored Oct 31, 2023
2 parents 4eb18f0 + 3b3a50a commit 32f6238
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 30 deletions.
13 changes: 10 additions & 3 deletions Pilot/dirac-pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,18 @@
log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions))

log.info("Executing commands: %s" % str(pilotParams.commands))
if pilotParams.pilotLogging:
log.buffer.flush()

if pilotParams.pilotLogging:
# It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the
# finaliser. No need for a timer in the "else" code segment below.
try:
log.buffer.cancelTimer()
log.debug("Timer canceled")
log.buffer.flush()
except Exception as exc:
log.error(str(exc))
for commandName in pilotParams.commands:
command, module = getCommand(pilotParams, commandName, log)
command, module = getCommand(pilotParams, commandName)
if command is not None:
command.log.info("Command %s instantiated from %s" % (commandName, module))
command.execute()
Expand Down
17 changes: 14 additions & 3 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, pilotParams):
import stat
import sys
import time
import traceback
from collections import Counter

############################
Expand Down Expand Up @@ -82,19 +83,29 @@ def wrapper(self):
return func(self)

try:
return func(self)
ret = func(self)
self.log.buffer.flush()
return ret

except SystemExit as exCode: # or Exception ?
# controlled exit
pRef = self.pp.pilotReference
self.log.info(
"Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
)
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit() and return).
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()).
try:
sendMessage(self.log.url, self.log.pilotUUID, "finaliseLogs", {"retCode": str(exCode)})
except Exception as exc:
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
raise

except Exception as exc:
# unexpected exit: document it and bail out.
self.log.error(str(exc))
self.log.error(traceback.format_exc())
raise
finally:
self.log.buffer.cancelTimer()
return wrapper


Expand Down
91 changes: 70 additions & 21 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import threading
from datetime import datetime
from distutils.version import LooseVersion
from functools import partial
from functools import partial, wraps
from threading import RLock

############################
# python 2 -> 3 "hacks"
Expand Down Expand Up @@ -52,6 +53,12 @@
except NameError:
FileNotFoundError = OSError

# Timer 2.7 issue where Timer is a function
if sys.version_info.major == 2:
from threading import _Timer as Timer # pylint: disable=no-name-in-module
else:
from threading import Timer

# Utilities functions


Expand Down Expand Up @@ -215,7 +222,7 @@ def listdir(directory):

def getFlavour(ceName):

pilotReference = os.environ.get("DIRAC_PILOT_STAMP", '')
pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "")
flavour = "DIRAC"

# # Batch systems
Expand Down Expand Up @@ -270,12 +277,7 @@ def getFlavour(ceName):
if "SSHBATCH_JOBID" in os.environ and "SSH_NODE_HOST" in os.environ:
flavour = "SSHBATCH"
pilotReference = (
"sshbatchhost://"
+ ceName
+ "/"
+ os.environ["SSH_NODE_HOST"]
+ "/"
+ os.environ["SSHBATCH_JOBID"]
"sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"]
)

# ARC
Expand Down Expand Up @@ -359,7 +361,7 @@ def loadObject(self, package, moduleName, command):
return None, None


def getCommand(params, commandName, log):
def getCommand(params, commandName):
"""Get an instantiated command object for execution.
Commands are looked in the following modules in the order:
Expand Down Expand Up @@ -453,7 +455,7 @@ def __init__(
pilotOutput="pilot.out",
isPilotLoggerOn=True,
pilotUUID="unknown",
setup="DIRAC-Certification",
flushInterval=10,
):
"""
c'tor
Expand All @@ -465,7 +467,7 @@ def __init__(
self.pilotUUID = pilotUUID
self.isPilotLoggerOn = isPilotLoggerOn
sendToURL = partial(sendMessage, url, pilotUUID, "sendMessage")
self.buffer = FixedSizeBuffer(sendToURL)
self.buffer = FixedSizeBuffer(sendToURL, autoflush=flushInterval)

def debug(self, msg, header=True, sendPilotLog=False):
super(RemoteLogger, self).debug(msg, header)
Expand Down Expand Up @@ -505,26 +507,51 @@ def sendMessage(self, msg):
super(RemoteLogger, self).error(str(err))


def synchronized(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
with self._rlock:
return func(self, *args, **kwargs)

return wrapper


class RepeatingTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)


class FixedSizeBuffer(object):
"""
A buffer with a (preferred) fixed number of lines.
Once it's full, a message is sent to a remote server and the buffer is renewed.
"""

def __init__(self, senderFunc, bufsize=10):
def __init__(self, senderFunc, bufsize=10, autoflush=10):
"""
Constructor.
:param senderFunc: a function used to send a message
:type senderFunc: func
:param bufsize: size of the buffer (in lines)
:type bufsize: int
:param autoflush: buffer flush period in seconds
:type autoflush: int
"""

self._rlock = RLock()
if autoflush > 0:
self._timer = RepeatingTimer(autoflush, self.flush)
self._timer.start()
else:
self._timer = None
self.output = StringIO()
self.bufsize = bufsize
self.__nlines = 0
self._nlines = 0
self.senderFunc = senderFunc

@synchronized
def write(self, text):
"""
Write text to a string buffer. Newline characters are counted and number of lines in the buffer
Expand All @@ -539,36 +566,49 @@ def write(self, text):
if self.output.closed:
self.output = StringIO()
self.output.write(text)
self.__nlines += max(1, text.count("\n"))
self._nlines += max(1, text.count("\n"))
self.sendFullBuffer()

@synchronized
def getValue(self):
content = self.output.getvalue()
return content

@synchronized
def sendFullBuffer(self):
"""
Get the buffer content, send a message, close the current buffer and re-create a new one for subsequent writes.
"""

if self.__nlines >= self.bufsize:
if self._nlines >= self.bufsize:
self.flush()
self.output = StringIO()

@synchronized
def flush(self):
"""
Flush the buffer and send log records to a remote server. The buffer is closed as well.
:return: None
:rtype: None
"""
if not self.output.closed:
self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
self._nlines = 0
self.output.close()

def cancelTimer(self):
"""
Cancel the repeating timer if it exists.
self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
self.__nlines = 0
self.output.close()
:return: None
:rtype: None
"""
if self._timer is not None:
self._timer.cancel()


def sendMessage(url, pilotUUID, method, rawMessage):
Expand Down Expand Up @@ -614,13 +654,18 @@ def __init__(self, pilotParams, dummy=""):
isPilotLoggerOn = pilotParams.pilotLogging
self.debugFlag = pilotParams.debugFlag
loggerURL = pilotParams.loggerURL
interval = pilotParams.loggerTimerInterval

if loggerURL is None:
self.log = Logger(self.__class__.__name__, debugFlag=self.debugFlag)
else:
# remote logger
self.log = RemoteLogger(
loggerURL, self.__class__.__name__, pilotUUID=pilotParams.pilotUUID, debugFlag=self.debugFlag
loggerURL,
self.__class__.__name__,
pilotUUID=pilotParams.pilotUUID,
debugFlag=self.debugFlag,
flushInterval=interval,
)

self.log.isPilotLoggerOn = isPilotLoggerOn
Expand Down Expand Up @@ -804,6 +849,7 @@ def __init__(self):
self.pilotCFGFile = "pilot.json"
self.pilotLogging = False
self.loggerURL = None
self.loggerTimerInterval = 600
self.pilotUUID = "unknown"
self.modules = "" # see dirac-install "-m" option documentation
self.userEnvVariables = "" # see dirac-install "--userEnvVariables" option documentation
Expand Down Expand Up @@ -1053,11 +1099,14 @@ def __initJSON2(self):
if pilotLogging is not None:
self.pilotLogging = pilotLogging.upper() == "TRUE"
self.loggerURL = pilotOptions.get("RemoteLoggerURL")
# logger buffer flush interval in seconds.
self.loggerTimerInterval = pilotOptions.get("RemoteLoggerTimerInterval", self.loggerTimerInterval)
pilotLogLevel = pilotOptions.get("PilotLogLevel", "INFO")
if pilotLogLevel.lower() == "debug":
self.debugFlag = True
self.log.debug("JSON: Remote logging: %s" % self.pilotLogging)
self.log.debug("JSON: Remote logging URL: %s" % self.loggerURL)
self.log.debug("JSON: Remote logging buffer flush interval in sec.(0: disabled): %s" % self.loggerTimerInterval)
self.log.debug("JSON: Remote/local logging debug flag: %s" % self.debugFlag)

# CE type if present, then Defaults, otherwise as defined in the code:
Expand Down
13 changes: 10 additions & 3 deletions Pilot/tests/Test_simplePilotLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import tempfile

try:
from Pilot.pilotTools import CommandBase, PilotParams
from Pilot.pilotTools import CommandBase, PilotParams, RemoteLogger
except ImportError:
from pilotTools import CommandBase, PilotParams
from pilotTools import CommandBase, PilotParams, RemoteLogger

import unittest

Expand Down Expand Up @@ -144,7 +144,14 @@ def test_executeAndGetOutput(self, popenMock, argvmock):
self.stderr_mock.write("Errare humanum est!")
self.stderr_mock.seek(0)
pp = PilotParams()
cBase = CommandBase(pp)
try:
cBase = CommandBase(pp)
# we have a logger URL set, so:
assert isinstance(cBase.log, RemoteLogger)
finally:
# and cancel the timer !
cBase.log.buffer.cancelTimer()

popenMock.return_value.stdout = self.stdout_mock
popenMock.return_value.stderr = self.stderr_mock
outData = cBase.executeAndGetOutput("dummy")
Expand Down

0 comments on commit 32f6238

Please sign in to comment.