Skip to content

Commit

Permalink
chg: [logarchive] support for parsing multiple logarchive folders #100
Browse files Browse the repository at this point in the history
  • Loading branch information
cvandeplas committed Sep 26, 2024
1 parent 9915d8b commit c08ffe1
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 6 deletions.
104 changes: 98 additions & 6 deletions parsers/logarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import subprocess
import sys
import tempfile

import shutil

# --------------------------------------------#

Expand Down Expand Up @@ -55,12 +55,12 @@ def get_log_files(self) -> list:

@DeprecationWarning
def execute(self) -> list | dict:
# OK, this is really inefficient as we're reading a file that we just wrote to a temporary folder
# OK, this is really inefficient as we're reading all to memory, writing it to a temporary file on disk, and re-reading it again
# but who cares, nobody uses this function anyway...
try:
with tempfile.TemporaryDirectory() as tmp_outpath:
tmp_output_file = os.path.join(tmp_outpath.name, 'logarchive.tmp')
LogarchiveParser.parse_folder_to_file(self.get_log_files()[0], tmp_output_file)
LogarchiveParser.parse_all_to_file(self.get_log_files(), tmp_output_file)
with open(tmp_output_file, 'r') as f:
return [json.loads(line) for line in f]
except IndexError:
Expand Down Expand Up @@ -96,9 +96,101 @@ def save_result(self, force: bool = False, indent=None):
# the result was already computed, just save it now
super().save_result(force, indent)
else:
# no caching
# TODO implement support of multiple folders, see issue #100
LogarchiveParser.parse_folder_to_file(self.get_log_files()[0], self.output_file)
LogarchiveParser.parse_all_to_file(self.get_log_files(), self.output_file)

def merge_files(temp_files: list, output_file: str):
for temp_file in temp_files:
first_entry, last_entry = LogarchiveParser.get_first_and_last_entries(temp_file['file'].name)
temp_file['first_timestamp'] = first_entry['time']
temp_file['last_timestamp'] = last_entry['time']

# lowest first timestamp, second key highest last timestamp
temp_files.sort(key=lambda x: (x['first_timestamp'], -x['last_timestamp']))

# do the merging magic here
# Open output file, with r+,
# Look at next file,
# - if current_last < prev_last: continue # skip file
# - elif current_first > prev_last: copy over full file, prev_last=current_last
# - else: # need to seek to prev_last and copy over new data
# Continue with other files with the same logic.
prev_temp_file = temp_files[0]
# first copy over first file to self.output_file
shutil.copyfile(prev_temp_file['file'].name, output_file)
with open(output_file, 'a') as f_out:
i = 1
while i < len(temp_files):
current_temp_file = temp_files[i]
if current_temp_file['last_timestamp'] < prev_temp_file['last_timestamp']:
# skip file as we already have all the data
# no need to update the prev_temp_file variable
pass
elif current_temp_file['first_timestamp'] > prev_temp_file['last_timestamp']:
# copy over the full file
with open(current_temp_file['file'].name, 'r') as f_in:
for line in f_in:
f_out.write(line)
prev_temp_file = current_temp_file
else:
# need to seek to prev_last and copy over new data
with open(current_temp_file['file'].name, 'r') as f_in:
copy_over = False # store if we need to copy over, spares us of json.loads() every line when we know we should be continuing
for line in f_in:
if not copy_over and json.loads(line)['time'] > prev_temp_file['last_timestamp']:
copy_over = True
if copy_over:
f_out.write(line)
prev_temp_file = current_temp_file
i += 1

def get_first_and_last_entries(output_file: str) -> tuple:
with open(output_file, 'rb') as f:
first_entry = json.loads(f.readline().decode())
# discover last line efficiently
f.seek(-2, os.SEEK_END) # Move the pointer to the second-to-last byte in the file
# Move backwards until a newline character is found, or we hit the start of the file
while f.tell() > 0:
char = f.read(1)
if char == b'\n':
break
f.seek(-2, os.SEEK_CUR) # Move backwards

# Read the last line
last_entry = json.loads(f.readline().decode())

return (first_entry, last_entry)

def parse_all_to_file(folders: list, output_file: str):
# no caching
# simple mode: only one folder
if len(folders) == 1:
LogarchiveParser.parse_folder_to_file(folders[0], output_file)
return

# complex mode: multiple folders, need to merge multiple files
# for each of the log folders
# - parse it to a temporary file, keep track of the file reference or name
# - keep track of the first and last timestamp of each file
# - order the files, and if a file contains a subset of another one, skip it.
# this is a though one, as we may have partially overlapping timeframes, so we may need to re-assemble in a smart way.
# - once we know the order, bring the files together to the final single output file

temp_files = []
try:
for folder in folders:
temp_file = tempfile.NamedTemporaryFile(delete_on_close=False)
LogarchiveParser.parse_folder_to_file(folder, temp_file.name)
temp_files.append({
'file': temp_file,
})

# merge files to the output file
LogarchiveParser.merge_files(temp_files, output_file)

finally:
# close all temp files, ensuring they are deleted
for temp_file in temp_files:
os.remove(temp_file['file'].name)

def parse_folder_to_file(input_folder: str, output_file: str) -> bool:
try:
Expand Down
71 changes: 71 additions & 0 deletions tests/test_parsers_logarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import unittest
import json
import tempfile


class TestParsersLogarchive(SysdiagnoseTestCase):
Expand Down Expand Up @@ -106,6 +107,76 @@ def test_convert_entry_to_unified(self):
self.maxDiff = None
self.assertDictEqual(result, expected_output)

def test_merge_files(self):
input = []
input.append([
{'time': 1, 'data': 'a'},
{'time': 2, 'data': 'b'},
{'time': 3, 'data': 'c'},
{'time': 4, 'data': 'd'},
{'time': 5, 'data': 'e'},
{'time': 6, 'data': 'f'},
{'time': 7, 'data': 'g'},
])
input.append([
{'time': 1, 'data': 'a-wrong'},
{'time': 2, 'data': 'b-wrong'},
{'time': 3, 'data': 'c-wrong'},
{'time': 4, 'data': 'd-wrong'},
{'time': 5, 'data': 'e-wrong'},
{'time': 6, 'data': 'f-wrong'},
{'time': 7, 'data': 'g-wrong'},
])

input.append([ # should be ignored
{'time': 4, 'data': 'd-wrong'},
{'time': 5, 'data': 'e-wrong'},
{'time': 6, 'data': 'f-wrong'},
])
input.append([ # overlaps first list, so must be taken from the end of first list
{'time': 7, 'data': 'g-wrong'},
{'time': 8, 'data': 'h'},
{'time': 9, 'data': 'i'},
{'time': 10, 'data': 'j'},
])
input.append([ # small gap
{'time': 12, 'data': 'k'},
{'time': 13, 'data': 'l'},
{'time': 14, 'data': 'm'},
])

expected_output = [{'time': 1, 'data': 'a'}, {'time': 2, 'data': 'b'}, {'time': 3, 'data': 'c'}, {'time': 4, 'data': 'd'}, {'time': 5, 'data': 'e'}, {'time': 6, 'data': 'f'}, {'time': 7, 'data': 'g'}, {'time': 8, 'data': 'h'}, {'time': 9, 'data': 'i'}, {'time': 10, 'data': 'j'}, {'time': 12, 'data': 'k'}, {'time': 13, 'data': 'l'}, {'time': 14, 'data': 'm'}]

temp_files = []
try:
# write the files with the test data
for file in input:
temp_file = tempfile.NamedTemporaryFile(delete_on_close=False)
temp_files.append({
'file': temp_file,
})
for entry in file:
temp_file.write(json.dumps(entry).encode())
temp_file.write(b'\n')
temp_file.close()
# merge the files
output_file = tempfile.NamedTemporaryFile(delete_on_close=False)
output_file.close()
LogarchiveParser.merge_files(temp_files, output_file.name)

# read the output file
result = []
with open(output_file.name, 'r') as f:
for line in f:
result.append(json.loads(line))

finally:
for temp_file in temp_files:
os.remove(temp_file['file'].name)
os.remove(output_file.name)

self.assertListEqual(result, expected_output)


if __name__ == '__main__':
unittest.main()

0 comments on commit c08ffe1

Please sign in to comment.