Skip to content

Commit

Permalink
improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 29, 2024
1 parent a3c5538 commit 85c34bf
Showing 1 changed file with 49 additions and 37 deletions.
86 changes: 49 additions & 37 deletions src/OpenAgentsNode.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import grpc
import logging

from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
Expand All @@ -15,15 +14,8 @@
from threading import Condition
import requests

def cnvLogLevel(logLevel):
if logLevel == "debug": return logging.DEBUG
if logLevel == "info": return logging.INFO
if logLevel == "warn": return logging.WARNING
if logLevel == "error": return logging.ERROR
if logLevel == "fine": return logging.DEBUG
if logLevel == "finer": return logging.DEBUG
if logLevel == "finest": return logging.DEBUG
return logging.DEBUG


class BlobWriter :
def __init__(self,writeQueue,res ):
self.writeQueue = writeQueue
Expand Down Expand Up @@ -166,7 +158,7 @@ def __init__(self, filters, meta, template, sockets):
self.cachePath = None
self.logger = None

self.logger = Logger("JobRunner", self)
self.logger = Logger("JobRunner", "0", self, False)
self._filters = filters

if not isinstance(meta, str):
Expand Down Expand Up @@ -325,22 +317,29 @@ def __init__(self, nameOrMeta=None, icon=None, description=None):
self.logger = None

name = ""
icon = ""
description = ""
version = "0.0.1"

if isinstance(nameOrMeta, str):
name = nameOrMeta
else :
name = nameOrMeta["name"]
icon = nameOrMeta["picture"]
description = nameOrMeta["about"]
name = nameOrMeta["name"] if "name" in nameOrMeta else None
icon = nameOrMeta["picture"] if "picture" in nameOrMeta else None
description = nameOrMeta["about"] if "about" in nameOrMeta else None
version = nameOrMeta["version"] if "version" in nameOrMeta else None

self.nodeName = name or os.getenv('NODE_NAME', "OpenAgentsNode")
self.nodeIcon = icon or os.getenv('NODE_ICON', "")
self.nodeVersion = version or os.getenv('NODE_VERSION', "0.0.1")
self.nodeDescription = description or os.getenv('NODE_DESCRIPTION', "")

self.channel = None
self.rpcClient = None
self.logger = Logger(name)
self.logger = Logger(self.nodeName,self.nodeVersion)

def registerRunner(self, runner):
runner.logger=self.logger
runner.logger=Logger(self.nodeName+"."+runner.__class__.__name__,self.nodeVersion,runner)
self.runners.append(runner)

def getLogger(self):
Expand Down Expand Up @@ -370,7 +369,10 @@ def getClient(self):
return self.rpcClient

async def _logToJob(self, message, jobId=None):
await self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))
try:
await self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))
except Exception as e:
print("Error logging to job "+str(e))

def log(self,message, jobId=None):
if jobId:
Expand Down Expand Up @@ -400,42 +402,42 @@ async def executePendingJobForRunner(self , runner):
))).jobs)

for job in jobs:
if len(jobs)>0 : self.log(str(len(jobs))+" pending jobs")
else : self.log("No pending jobs")
if len(jobs)>0 : runner.getLogger().log(str(len(jobs))+" pending jobs")
else : runner.getLogger().log("No pending jobs")
wasAccepted=False
t=time.time()
try:
client = self.getClient() # Reconnect client for each job
if not await runner.canRun(job):
continue
asyncio.create_task(self._acceptJob(job.id))
await self._acceptJob(job.id)
wasAccepted = True
self.log("Job started on node "+self.nodeName, job.id)
runner.getLogger().info("Job started on node "+self.nodeName)
runner._setNode(self)
runner._setJob(job)
await runner.preRun()
async def task():
try:
output=await runner.run(job)
await runner.postRun()
self.log("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)
runner.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)
await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
except Exception as e:
self.failedJobsTracker.append([job.id, time.time()])
self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
if wasAccepted:
await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
traceback.print_exc()
asyncio.create_task(task())
except Exception as e:
self.failedJobsTracker.append([job.id, time.time()])
self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
if wasAccepted:
await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
traceback.print_exc()
except Exception as e:
self.log("Error executing runner "+str(e), None)
traceback.print_exc()
runner.getLogger().error("Error executing runner "+str(e))
await asyncio.sleep(5000.0/1000.0)
self.runnerTasks[runner]=asyncio.create_task(self.executePendingJobForRunner(runner))

Expand All @@ -447,7 +449,7 @@ async def executePendingJob(self ):
if not runner in self.runnerTasks:
self.runnerTasks[runner]=asyncio.create_task(self.executePendingJobForRunner(runner))
except Exception as e:
self.log("Error executing pending job "+str(e), None)
self.getLogger().log("Error executing pending job "+str(e), None)


async def reannounce(self):
Expand All @@ -463,9 +465,9 @@ async def reannounce(self):
description = self.nodeDescription,
))
self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval
self.log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
except Exception as e:
self.log("Error announcing node "+ str(e), None)
self.getLogger().error("Error announcing node "+ str(e), None)
self.nextNodeAnnounce = int(time.time()*1000) + 5000

for runner in self.runners:
Expand All @@ -478,12 +480,12 @@ async def reannounce(self):
sockets=runner._sockets
))
runner._nextAnnouncementTimestamp = int(time.time()*1000) + res.refreshInterval
self.log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
except Exception as e:
self.log("Error announcing template "+ str(e), None)
self.getLogger().error("Error announcing template "+ str(e), None)
runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000
except Exception as e:
self.log("Error reannouncing "+str(e), None)
self.getLogger().error("Error reannouncing "+str(e), None)
await asyncio.sleep(5000.0/1000.0)
asyncio.create_task(self.reannounce())

Expand Down Expand Up @@ -531,6 +533,10 @@ def log(self, level, message, timestamp=None):
'_timestamp': timestamp or int(time.time()*1000),
'message': message
}
meta=self.options["meta"] if "meta" in self.options else {}
for key in meta:
log_entry[key]=meta[key]

self.buffer.put(log_entry)
if self.buffer.qsize() >= self.batchSize:
with self.wait:
Expand Down Expand Up @@ -571,18 +577,17 @@ def flushLoop(self):


class Logger :


def __init__(self, name, runner=None):
def __init__(self, name, version, runner=None, enableOobs=True):
self.name=name or "main"
self.runner=runner
self.logger=None
self.logLevel=None
self.oobsLogger=None
self.version=version
self.logLevel = os.getenv('LOG_LEVEL', "debug")
oobsEndPoint = os.getenv('OPENOBSERVE_ENDPOINT', None)
self.oobsLogLevel= os.getenv('OPENOBSERVE_LOGLEVEL', self.logLevel)
if oobsEndPoint:
if enableOobs and oobsEndPoint:
self.oobsLogger = OpenObserveLogger({
"baseUrl": oobsEndPoint,
"org": os.getenv('OPENOBSERVE_ORG', "default"),
Expand All @@ -593,6 +598,10 @@ def __init__(self, name, runner=None):
},
"batchSize": int(os.getenv('OPENOBSERVE_BATCHSIZE', 21)),
"flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)),
"meta":{
"appName": self.name,
"appVersion": self.version
}

})

Expand All @@ -611,10 +620,12 @@ def _log(self, level, message):
levelV=self.levelToValue(level)
if levelV >=self.levelToValue(self.logLevel):
date = time.strftime("%Y-%m-%d %H:%M:%S")
print(date+" ["+self.name+"] : "+level+" : "+message)
print(date+" ["+self.name+":"+self.version+"] : "+level+" : "+message)
if self.oobsLogger and self.levelToValue(self.oobsLogLevel) >= levelV:
self.oobsLogger.log(level, message)


if self.runner and levelV >= self.levelToValue("info"):
self.runner.log(message)


def log(self, *args):
Expand All @@ -628,6 +639,7 @@ def warn(self, *args):

def error(self, *args):
self._log("error", " ".join([str(x) for x in args]))
traceback.print_exc()

def debug(self, *args):
self._log("debug", " ".join([str(x) for x in args]))
Expand Down

0 comments on commit 85c34bf

Please sign in to comment.