Skip to content

Commit

Permalink
improved logs handling, addresses #237
Browse files Browse the repository at this point in the history
- added log context in many places
- extended log context by branch, flow kind, flow, run and agent
- added retention policy for logs per branch
- added presenting logs on branch, flow, run and agent pages
  • Loading branch information
godfryd committed Feb 18, 2023
1 parent 8c8c007 commit 7438cbc
Show file tree
Hide file tree
Showing 27 changed files with 961 additions and 281 deletions.
6 changes: 5 additions & 1 deletion agent/kraken/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ def run():
break
log.warning('agent is not authorized, sleeping for 10s')
time.sleep(10)
agent_id = resp.get('agent_id', None)
log.set_ctx(agent=agent_id)

one_job = config.get('one_job', False)
if one_job:
Expand All @@ -156,7 +158,8 @@ def run():
update.update_agent(version)

if job:
log.set_ctx(job=job['id'], run=job['run_id'])
log.set_ctx(job=job['id'], run=job['run_id'], flow=job['flow_id'],
flow_kind=job['flow_kind'], branch=job['branch_id'])

_enable_masking_secrets_in_logs(job)

Expand All @@ -180,3 +183,4 @@ def run():
log.exception('ignored exception in agent main loop')
time.sleep(5)
log.reset_ctx()
log.set_ctx(agent=agent_id)
13 changes: 2 additions & 11 deletions agent/kraken/agent/docker_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,10 @@ def _async_run(self, cmd, deadline, cwd='/', user='root'):
exit_code, cmd, str(cwd), str(user), t0, t1, timeout))
return logs

async def _dkr_run(self, proc_coord, cmd, cwd, deadline, user, log_ctx=None):
async def _dkr_run(self, proc_coord, cmd, cwd, deadline, user):
t0, t1, timeout = utils.get_times(deadline)
log.info("cmd '%s' in '%s', now %s, deadline %s, time: %ds", cmd, cwd, t0, t1, timeout)

if log_ctx:
log.set_ctx(**log_ctx)

exe = self.cntr.client.api.exec_create(self.cntr.id, cmd, workdir=cwd, user=user, environment=None)
sock = self.cntr.client.api.exec_start(exe['Id'], socket=True)

Expand Down Expand Up @@ -318,9 +315,6 @@ async def _dkr_run(self, proc_coord, cmd, cwd, deadline, user, log_ctx=None):
if logs_to_print:
log.info(logs_to_print)

if log_ctx:
log.reset_ctx()

if proc_coord and proc_coord.is_canceled:
exit_code = 10001
log.info('CANCELED')
Expand Down Expand Up @@ -382,20 +376,17 @@ async def _async_run_exc(self, proc_coord, tool_path, return_addr, step, step_fi
with open(step_file_path) as f:
data = f.read()
step = json.loads(data)
log_ctx = dict(job=step['job_id'], step=step['index'], tool=step['tool'])

# run tool
deadline = time.time() + timeout
try:
await self._dkr_run(proc_coord, cmd, docker_cwd, deadline, user, log_ctx)
await self._dkr_run(proc_coord, cmd, docker_cwd, deadline, user)
except Timeout:
if proc_coord.result == {}:
t0, t1, timeout = utils.get_times(deadline)
log.info('job time expired, now: %s, deadline: %s', t0, t1)
proc_coord.result = {'status': 'error', 'reason': 'job-timeout'}

log.reset_ctx()


async def async_run(self, proc_coord, tool_path, return_addr, step, step_file_path, command, cwd, timeout, user): # pylint: disable=unused-argument
try:
Expand Down
2 changes: 2 additions & 0 deletions agent/kraken/agent/jobber.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ def run(srv, job):
log.set_ctx(step=idx)

step['job_id'] = job['id']
step['branch_id'] = job['branch_id']
step['flow_kind'] = job['flow_kind']
step['flow_id'] = job['flow_id']
step['run_id'] = job['run_id']
if 'trigger_data' in job:
Expand Down
7 changes: 2 additions & 5 deletions agent/kraken/agent/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ async def _async_run_exc(self, proc_coord, tool_path, return_addr, step, step_fi
with open(step_file_path) as f:
data = f.read()
step = json.loads(data)
log_ctx = dict(job=step['job_id'], step=step['index'], tool=step['tool'])

self.proc_coord = proc_coord
self.cmd = cmd
Expand All @@ -89,7 +88,7 @@ async def _async_run_exc(self, proc_coord, tool_path, return_addr, step, step_fi
start_new_session=True)

try:
await self._async_pump_output(proc.stdout, log_ctx)
await self._async_pump_output(proc.stdout)

if timeout:
await asyncio.wait([proc.wait(), self._async_monitor_proc(proc, timeout * 0.95)],
Expand All @@ -116,7 +115,7 @@ async def async_run(self, proc_coord, tool_path, return_addr, step, step_file_pa
log.exception('passing up')
raise

async def _async_pump_output(self, stream, log_ctx):
async def _async_pump_output(self, stream):
while True:
try:
line = await stream.readline()
Expand All @@ -125,9 +124,7 @@ async def _async_pump_output(self, stream, log_ctx):
continue
if line:
line = line.decode().rstrip()
log.set_ctx(**log_ctx)
log.info(line)
log.reset_ctx()
else:
break

Expand Down
13 changes: 0 additions & 13 deletions agent/kraken/agent/lxd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ def __init__(self, job):
self.client = None
self.lab_net = None
self.cntr = None
self.log_ctx = None
self.logs = []

def _start(self, timeout):
log.set_ctx(job=self.job['id'])

self.client = pylxd.Client()

# prepare network for container
Expand Down Expand Up @@ -165,15 +162,8 @@ def _stdout_handler(self, chunk):

self.logs.append(chunk)

if self.log_ctx:
log.set_ctx(**self.log_ctx)

log.info(chunk.rstrip())

if self.log_ctx:
log.reset_ctx()
log.set_ctx(job=self.job['id'])

def _async_run(self, cmd, deadline, cwd='/', env=None):
logs, exit_code = asyncio.run(self._lxd_run(cmd, cwd, deadline, env))
if exit_code != 0:
Expand Down Expand Up @@ -241,7 +231,6 @@ async def _async_run_exc(self, proc_coord, tool_path, return_addr, step, step_fi
with open(step_file_path) as f:
data = f.read()
step = json.loads(data)
self.log_ctx = dict(job=step['job_id'], step=step['index'], tool=step['tool'])

deadline = time.time() + timeout
try:
Expand All @@ -251,8 +240,6 @@ async def _async_run_exc(self, proc_coord, tool_path, return_addr, step, step_fi
if proc_coord.result == {}:
proc_coord.result = {'status': 'error', 'reason': 'job-timeout'}

self.log_ctx = None


async def async_run(self, proc_coord, tool_path, return_addr, step, step_file_path, command, cwd, timeout, user): # pylint: disable=unused-argument
try:
Expand Down
36 changes: 28 additions & 8 deletions clickhouse-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ struct LogEntryOut {
lineno: u32,
level: String,
branch: u64,
flow_kind: u8,
flow: u64,
run: u64,
job: u64,
tool: String,
step: i8
step: i8,
agent: u64,
}

const KRAKEN_VERSION: &str = env!("KRAKEN_VERSION");
Expand Down Expand Up @@ -75,12 +77,14 @@ async fn read_and_parse_log(buf: [u8; 65536], len: usize, mut tx: Sender<LogEntr
path: le_in["path"].as_str().unwrap().to_string(),
lineno: le_in["lineno"].as_i64().unwrap() as u32,
level: le_in["level"].as_str().unwrap().to_string(),
branch: 0,
flow: 0,
run: 0,
branch: le_in.get("branch").map_or(0, |v| v.as_u64().unwrap()),
flow_kind: le_in.get("flow_kind").map_or(0, |v| v.as_u64().unwrap() as u8),
flow: le_in.get("flow").map_or(0, |v| v.as_u64().unwrap()),
run: le_in.get("run").map_or(0, |v| v.as_u64().unwrap()),
job: le_in.get("job").map_or(0, |v| v.as_u64().unwrap()),
tool: le_in.get("tool").map_or("".to_string(), |v| v.as_str().unwrap().to_string()),
step: le_in.get("step").map_or(-1, |v| v.as_i64().unwrap() as i8),
agent: le_in.get("agent").map_or(0, |v| v.as_u64().unwrap()),
};

let res = tx.send(le_out).await;
Expand All @@ -105,12 +109,14 @@ async fn store_logs_batch(client: &Client, batch: &Vec<LogEntryOut>) -> Result<(
path: le.path.clone(),
lineno: le.lineno,
level: le.level.clone(),
branch: 0,
flow: 0,
run: 0,
branch: le.branch,
flow_kind: le.flow_kind,
flow: le.flow,
run: le.run,
job: le.job,
tool: le.tool.clone(),
step: le.step,
agent: le.agent,
};
// println!("msg {:?} {:?}", idx, le2.message);
insert.write(&le2).await?;
Expand Down Expand Up @@ -195,11 +201,25 @@ async fn store_logs(rx: &mut Receiver<LogEntryOut>) -> Result<()> {
db_version += 1;
}

// migration to version 6
if db_version < 6 {
let cmd = r"ALTER TABLE logs ADD COLUMN flow_kind UInt8 AFTER branch";
client.query(cmd).execute().await?;
db_version += 1;
}

// migration to version 7
if db_version < 7 {
let cmd = r"ALTER TABLE logs ADD COLUMN agent UInt64 AFTER step";
client.query(cmd).execute().await?;
db_version += 1;
}

// store latest version
let insert_version = r"INSERT INTO db_schema_version (id, version) VALUES (1, ?)";
client.query(insert_version).bind(db_version).execute().await?;

println!("logs table created in clickhouse");
println!("logs table created or updated in clickhouse, now db version {:?}", db_version);
println!("waiting for logs to store");

let mut batch = Vec::with_capacity(100);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""added retention_policy to branch
Revision ID: cea0e48abb5a
Revises: 5c16623f4d55
Create Date: 2023-02-16 08:06:50.864851
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'cea0e48abb5a'
down_revision = '5c16623f4d55'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('branches', sa.Column('retention_policy', postgresql.JSONB(astext_type=sa.Text()), nullable=True))


def downgrade():
op.drop_column('branches', 'retention_policy')
20 changes: 13 additions & 7 deletions server/kraken/server/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def _handle_get_job(agent):
after=commits[0]['id'])]

# attach storage info to job
job['branch_id'] = agent.job.run.flow.branch_id
job['flow_kind'] = agent.job.run.flow.kind
job['flow_id'] = agent.job.run.flow_id
job['run_id'] = agent.job.run_id

Expand Down Expand Up @@ -324,13 +326,13 @@ def destroy_machine_if_needed(agent, job):
q = q.filter(Job.finished.isnot(None))
q = q.filter(Job.finished > agent.created)
jobs_num = q.count()
log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs)
log.info('JOB %s, num %d, max %d', job, jobs_num, max_jobs)
if jobs_num >= max_jobs:
to_destroy = True

# aws ecs fargate
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE:
log.info('ECS FARGATE JOB %s - destroying task: %d', job.id, agent.id)
log.info('ECS FARGATE JOB %s - destroying task: %d', job, agent.id)
to_destroy = True

# azure vm
Expand All @@ -343,13 +345,13 @@ def destroy_machine_if_needed(agent, job):
q = q.filter(Job.finished.isnot(None))
q = q.filter(Job.finished > agent.created)
jobs_num = q.count()
log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs)
log.info('JOB %s, num %d, max %d', job, jobs_num, max_jobs)
if jobs_num >= max_jobs:
to_destroy = True

# kubernetes
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_K8S:
log.info('K8S JOB %s - destroying pod: %d', job.id, agent.id)
log.info('K8S JOB %s - destroying pod: %d', job, agent.id)
to_destroy = True

# schedule destruction if needed
Expand Down Expand Up @@ -543,9 +545,11 @@ def _handle_host_info(agent, req): # pylint: disable=unused-argument
agent.host_info = req['info']
db.session.commit()

resp = dict(agent_id=agent.id)

if sys:
# agent has already identified system so it doesn't need to be created
return
return resp

try:
System(name=system, executor='local')
Expand All @@ -554,6 +558,8 @@ def _handle_host_info(agent, req): # pylint: disable=unused-argument
if not isinstance(e.orig, UniqueViolation):
log.exception('IGNORED')

return resp


def _handle_keep_alive(agent, req): # pylint: disable=unused-argument
job = agent.job
Expand Down Expand Up @@ -591,6 +597,7 @@ def _handle_unknown_agent(address, ip_address, agent):


def _serve_agent_request():
log.reset_ctx()
req = request.get_json()
# log.info('request headers: %s', request.headers)
# log.info('request remote_addr: %s', request.remote_addr)
Expand Down Expand Up @@ -632,8 +639,7 @@ def _serve_agent_request():
response = _handle_dispatch_tests(agent, req)

elif msg == consts.AGENT_MSG_HOST_INFO:
_handle_host_info(agent, req)
response = {}
response = _handle_host_info(agent, req)

elif msg == consts.AGENT_MSG_KEEP_ALIVE:
response = _handle_keep_alive(agent, req)
Expand Down
Loading

0 comments on commit 7438cbc

Please sign in to comment.