Skip to content

Commit

Permalink
Add proper logging
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 29, 2024
1 parent be90b32 commit 7bb39a7
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 50 deletions.
227 changes: 193 additions & 34 deletions src/OpenAgentsNode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import grpc
import logging

from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
import time
Expand All @@ -8,8 +10,20 @@
import asyncio
import pickle
import queue
import base64
import concurrent

from threading import Condition
import requests

def cnvLogLevel(logLevel):
if logLevel == "debug": return logging.DEBUG
if logLevel == "info": return logging.INFO
if logLevel == "warn": return logging.WARNING
if logLevel == "error": return logging.ERROR
if logLevel == "fine": return logging.DEBUG
if logLevel == "finer": return logging.DEBUG
if logLevel == "finest": return logging.DEBUG
return logging.DEBUG
class BlobWriter :
def __init__(self,writeQueue,res ):
self.writeQueue = writeQueue
Expand Down Expand Up @@ -137,18 +151,22 @@ def getUrl(self):
return self.url

class JobRunner:
_filters = None
_node = None
_job = None
_disksByUrl = {}
_disksById = {}
_diskByName = {}
_template = None
_meta = None
_sockets = None
_nextAnnouncementTimestamp = 0
cachePath = None

def __init__(self, filters, meta, template, sockets):
self._filters = None
self._node = None
self._job = None
self._disksByUrl = {}
self._disksById = {}
self._diskByName = {}
self._template = None
self._meta = None
self._sockets = None
self._nextAnnouncementTimestamp = 0
self.cachePath = None
self.logger = None

self.logger = Logger("JobRunner", self)
self._filters = filters

if not isinstance(meta, str):
Expand All @@ -168,6 +186,9 @@ def __init__(self, filters, meta, template, sockets):
if not os.path.exists(self.cachePath):
os.makedirs(self.cachePath)


def getLogger(self,name=None):
return self.logger

async def cacheSet(self, path, value, version=0, expireAt=0, local=False):
try:
Expand All @@ -194,7 +215,7 @@ def write_data():
res=await client.cacheSet(write_data())
return res.success
except Exception as e:
print("Error setting cache "+str(e))
self.getLogger().error("Error setting cache "+str(e))
return False


Expand Down Expand Up @@ -222,7 +243,7 @@ async def cacheGet(self, path, lastVersion = 0, local=False):
bytesOut.extend(chunk.data)
return pickle.loads(bytesOut)
except Exception as e:
print("Error getting cache "+str(e))
self.getLogger().error("Error getting cache "+str(e))
return None

def _setNode(self, node):
Expand All @@ -235,8 +256,7 @@ def log(self, message):
if self._job: message+=" for job "+self._job.id
if self._node:
self._node.log(message, self._job.id if self._job else None)
else:
print(message)


async def openStorage(self, url):
if url in self._disksByUrl:
Expand Down Expand Up @@ -289,19 +309,23 @@ async def run(self, job):
pass

class OpenAgentsNode:
nextNodeAnnounce = 0
nodeName = ""
nodeIcon = ""
nodeDescription = ""
channel = None
rpcClient = None
runners=[]
poolAddress = None
poolPort = None
failedJobsTracker = []
isLooping = False

def __init__(self, nameOrMeta=None, icon=None, description=None):
self.nextNodeAnnounce = 0
self.nodeName = ""
self.nodeIcon = ""
self.nodeDescription = ""
self.channel = None
self.rpcClient = None
self.runners=[]
self.poolAddress = None
self.poolPort = None
self.failedJobsTracker = []
self.isLooping = False
self.logger = None

name = ""

if isinstance(nameOrMeta, str):
name = nameOrMeta
else :
Expand All @@ -313,19 +337,24 @@ def __init__(self, nameOrMeta=None, icon=None, description=None):
self.nodeDescription = description or os.getenv('NODE_DESCRIPTION', "")
self.channel = None
self.rpcClient = None
self.logger = Logger(name)

def registerRunner(self, runner):
runner.logger=self.logger
self.runners.append(runner)

def getLogger(self):
return self.logger

def getClient(self):
if self.channel is None or self.channel._channel.check_connectivity_state(True) == grpc.ChannelConnectivity.SHUTDOWN:
if self.channel is not None:
try:
self.getLogger().info("Closing channel")
self.channel.close()
except Exception as e:
print("Error closing channel "+str(e))
print("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))
self.getLogger().error("Error closing channel "+str(e))
self.getLogger().info("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))

options=[
# 20 MB
Expand All @@ -340,14 +369,12 @@ def getClient(self):
self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel)
return self.rpcClient

async def _log(self, message, jobId=None):
async def _logToJob(self, message, jobId=None):
await self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))

def log(self,message, jobId=None):
print(message)
if jobId:
#self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))
asyncio.create_task(self._log(message, jobId))
asyncio.create_task(self._logToJob(message, jobId))

async def _acceptJob(self, jobId):
await self.getClient().acceptJob(rpc_pb2.RpcAcceptJob(jobId=jobId))
Expand Down Expand Up @@ -482,4 +509,136 @@ async def run(self, poolAddress=None, poolPort=None, poolSsl=False):
while True:
await self.executePendingJob()
await asyncio.sleep(1000.0/1000.0)


class OpenObserveLogger:

def __init__(self, options):
self.options = options
self.batchSize= self.options["batchSize"]
self.flushInterval = self.options["flushInterval"]
if not self.flushInterval:
self.flushInterval = 5000
if not self.batchSize:
self.batchSize = 21
self.buffer = queue.Queue()
self.wait = Condition()
self.flushThread = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self.flushThread.submit(self.flushLoop)

def log(self, level, message, timestamp=None):
log_entry = {
'level': level,
'_timestamp': timestamp or int(time.time()*1000),
'message': message
}
self.buffer.put(log_entry)
if self.buffer.qsize() >= self.batchSize:
with self.wait:
self.wait.notify_all()


def flushLoop(self):
while True:
with self.wait:
self.wait.wait(self.flushInterval/1000)

batch = []
while not self.buffer.empty():
batch.append(self.buffer.get())

try:
url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+"test"+"/_json"

basicAuth = self.options["auth"]
if not isinstance(basicAuth, str):
if "username" in basicAuth and "password" in basicAuth:
basicAuth = basicAuth["username"]+":"+basicAuth["password"]
basicAuth = base64.b64encode(basicAuth.encode()).decode()

headers = {
'Content-Type': 'application/json',
"Authorization": "Basic "+basicAuth if basicAuth else None
}

res = requests.post(url, headers=headers, json=batch)
if res.status_code != 200:
print("Error flushing log "+str(res.status_code))

except Exception as e:
print("Error flushing log "+str(e))




class Logger :


def __init__(self, name, runner=None):
self.name=name or "main"
self.runner=runner
self.logger=None
self.logLevel=None
self.oobsLogger=None
self.logLevel = os.getenv('LOG_LEVEL', "debug")
oobsEndPoint = os.getenv('OPENOBSERVE_ENDPOINT', None)
self.oobsLogLevel= os.getenv('OPENOBSERVE_LOGLEVEL', self.logLevel)
if oobsEndPoint:
self.oobsLogger = OpenObserveLogger({
"baseUrl": oobsEndPoint,
"org": os.getenv('OPENOBSERVE_ORG', "default"),
"stream": os.getenv('OPENOBSERVE_STREAM', "default"),
"auth": os.getenv('OPENOBSERVE_BASICAUTH', None) or {
"username": os.getenv('OPENOBSERVE_USERNAME', None),
"password": os.getenv('OPENOBSERVE_PASSWORD', None)
},
"batchSize": int(os.getenv('OPENOBSERVE_BATCHSIZE', 21)),
"flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)),

})

def levelToValue(self, level):
if level == "error": return 8
if level == "warn": return 7
if level == "info": return 6
if level == "verbose": return 5
if level == "debug": return 4
if level == "fine": return 3
if level == "finer": return 2
if level == "finest": return 1
return level

def _log(self, level, message):
levelV=self.levelToValue(level)
if levelV >=self.levelToValue(self.logLevel):
date = time.strftime("%Y-%m-%d %H:%M:%S")
print(date+" ["+self.name+"] : "+level+" : "+message)
if self.oobsLogger and self.levelToValue(self.oobsLogLevel) >= levelV:
self.oobsLogger.log(level, message)



def log(self, *args):
self._log("debug", " ".join([str(x) for x in args]))

def info(self, *args):
self._log("info", " ".join([str(x) for x in args]))

def warn(self, *args):
self._log("warn", " ".join([str(x) for x in args]))

def error(self, *args):
self._log("error", " ".join([str(x) for x in args]))

def debug(self, *args):
self._log("debug", " ".join([str(x) for x in args]))

def fine(self, *args):
self._log("fine", " ".join([str(x) for x in args]))

def finer(self, *args):
self._log("finer", " ".join([str(x) for x in args]))

def finest(self, *args):
self._log("finest", " ".join([str(x) for x in args]))


Loading

0 comments on commit 7bb39a7

Please sign in to comment.