Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

668 joblogs #43

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dfc939b
remove limited reconnection attempts; cap backoff time with 15 min; c…
vlerkin Nov 20, 2024
f472580
refactor code to make config import and logging configuration separat…
vlerkin Nov 20, 2024
19bfc19
change level to INFO in config instead of DEBUG to make it less confu…
vlerkin Nov 20, 2024
4761f26
remove lines to configure the logging level
vlerkin Nov 20, 2024
8792063
add logging.getLevelName() method to be used instead of creating a di…
vlerkin Nov 20, 2024
7609bb6
move logging configuration to the api.py; move config loading to the …
vlerkin Nov 22, 2024
4e0405c
rename logging_level to log_level
vlerkin Nov 22, 2024
82c55e2
remove redundant check of the log_level in the logging_config.py
vlerkin Nov 22, 2024
d47c9e8
Small changes
wvengen Nov 22, 2024
d274cac
change timeout_seconds parameter for k8s api connection from 0 to 300…
vlerkin Nov 26, 2024
f633d5a
change timeout_seconds watcher param back to 0 to try and hold connec…
vlerkin Nov 28, 2024
9c64ce4
remove job_name mapping and implement a stateless directory sreen to …
vlerkin Dec 3, 2024
27317eb
remove empty line
vlerkin Dec 4, 2024
a3de0b7
make directory for logs configurable providing a default option; remo…
vlerkin Dec 17, 2024
fc50b64
describe a new parameter in CONFIG.md to provide info on how to set a…
vlerkin Dec 17, 2024
1189e80
remove job_name as it is not correct, use job_id for file names and p…
vlerkin Dec 17, 2024
87ce2c8
move logs_dir param to the joblogs section in config, adjust code to …
vlerkin Dec 17, 2024
b2dd992
remove default value from the code, make logs_dir a mandatory param i…
vlerkin Dec 17, 2024
b682edc
added functionality to remove successfully ploaded files; additionaly…
vlerkin Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ For Kubernetes, it is important to set resource limits.

TODO: explain how to set limits, with default, project and spider specificity.

### [joblogs] section
* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes). When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems.



### Kubernetes API interaction

Expand Down
3 changes: 3 additions & 0 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ data:
requests_memory = 0.2G
limits_cpu = 0.8
limits_memory = 0.5G

[joblogs]
logs_dir = /tmp/scrapyd_k8s_logs
---
apiVersion: v1
kind: Secret
Expand Down
128 changes: 85 additions & 43 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,46 @@ class KubernetesJobLogHandler:
"""
A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage.

...
This class:
- Observes Kubernetes pods for job-related events.
- Streams logs from running pods, storing them locally.
- Uploads completed job logs to object storage.
- Retrieves and concatenates log files as needed.

Attributes
----------
DEFAULT_BLOCK_SIZE : int
Default size (in bytes) of blocks to read when retrieving the last N lines from a file.
Default size (in bytes) of blocks to read when retrieving lines from a file.
config : object
Configuration object containing settings for job logs and storage.
watcher_threads : dict
Dictionary to keep track of watcher threads for each pod.
pod_tmp_mapping : dict
Mapping of pod names to their temporary log file paths.
namespace : str
Kubernetes namespace to watch pods in.
num_lines_to_check : int
Number of lines to check for matching logs when resuming log streaming.
Number of lines to check from the end of the existing log file to avoid duplicates.
object_storage_provider : LibcloudObjectStorage
Instance of the object storage provider for uploading logs.

Methods
-------
start():
Starts the pod watcher thread for job logs.
get_existing_log_filename(job_name):
Retrieves an existing temporary log file path for a given job name.

get_last_n_lines(file_path, num_lines):
Efficiently retrieves the last `num_lines` lines from a file.

concatenate_and_delete_files(main_file_path, temp_file_path, block_size=6144):
Concatenates a temporary file to the main log file and deletes the temporary file.

make_log_filename_for_job(job_name):
Generates a unique temporary file path for storing logs of a job.
Ensures a log file exists for a given job and returns its path.

stream_logs(job_name):
Streams logs from a Kubernetes pod and writes them to a file.
watch_pods():
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
Streams logs from a Kubernetes pod corresponding to the given job name and writes them to a file.

handle_events(event):
Processes Kubernetes pod events to start log streaming or upload logs when pods complete.
"""
# The value was chosen to provide a balance between memory usage and the number of I/O operations
DEFAULT_BLOCK_SIZE = 6144
Expand All @@ -60,11 +67,32 @@ def __init__(self, config):
"""
self.config = config
self.watcher_threads = {}
self.pod_tmp_mapping = {}
self.namespace = config.namespace()
self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0))
self.logs_dir = self.config.joblogs().get('logs_dir').strip()
if not self.logs_dir:
raise ValueError("Configuration error: 'logs_dir' is missing in joblogs configuration section.")
self.object_storage_provider = LibcloudObjectStorage(self.config)

def get_existing_log_filename(self, job_id):
"""
Retrieves the existing temporary log file path for a job without creating a new one.

Parameters
----------
job_id : str
ID of the Kubernetes job or pod, which is also the name of the log file.

Returns
-------
str or None
Path to the existing temporary log file for the given job, or None if no such file exists.
"""
log_file_path = os.path.join(self.logs_dir, f"{job_id}.txt")
if os.path.isfile(log_file_path):
return log_file_path
return None

def get_last_n_lines(self, file_path, num_lines):
"""
Efficiently retrieves the last `num_lines` lines from a file.
Expand Down Expand Up @@ -141,38 +169,46 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz
except (IOError, OSError) as e:
logger.error(f"Failed to concatenate and delete files for job: {e}")

def make_log_filename_for_job(self, job_name):
def make_log_filename_for_job(self, job_id):
"""
Generates a unique temporary file path for storing logs of a job.
Creates a log file path for a job, using the job name as the file name or returns a path to an existing file.

Parameters
----------
job_name : str
Name of the Kubernetes job or pod.
Parameters
----------
job_id : str
ID of the Kubernetes job.

Returns
-------
str
Path to the temporary log file for the given job.
Returns
-------
str
Path to the temporary log file for the given job.
"""
if self.pod_tmp_mapping.get(job_name) is not None:
return self.pod_tmp_mapping[job_name]
temp_dir = tempfile.gettempdir()
app_temp_dir = os.path.join(temp_dir, 'job_logs')
os.makedirs(app_temp_dir, exist_ok=True)
fd, path = tempfile.mkstemp(prefix=f"{job_name}_logs_", suffix=".txt", dir=app_temp_dir)
os.close(fd)
self.pod_tmp_mapping[job_name] = path
return path

def stream_logs(self, job_name):

if not os.path.isdir(self.logs_dir):
os.makedirs(self.logs_dir)

log_file_path = os.path.join(self.logs_dir, f"{job_id}.txt")
if os.path.exists(log_file_path):
return log_file_path

with open(log_file_path, 'w') as file:
pass

return log_file_path



def stream_logs(self, job_id, pod_name):
"""
Streams logs from a Kubernetes pod and writes them to a file.

Parameters
----------
job_name : str
Name of the Kubernetes pod to stream logs from.
job_id : str
ID of the Kubernetes job to use as a log file name.

pod_name : str
Name of the Kubernetes pod to read logs from.

Returns
-------
Expand All @@ -181,20 +217,20 @@ def stream_logs(self, job_name):
log_lines_counter = 0
v1 = client.CoreV1Api()
w = watch.Watch()
log_file_path = self.make_log_filename_for_job(job_name)
log_file_path = self.make_log_filename_for_job(job_id)
last_n_lines = self.get_last_n_lines(log_file_path, self.num_lines_to_check)
if len(last_n_lines) == 0:
logger.info(f"Log file '{log_file_path}' is empty or not found. Starting fresh logs for job '{job_name}'.")
logger.info(f"Log file '{log_file_path}' is empty or not found. Starting fresh logs for job '{job_id}'.")

try:
with open(log_file_path, 'a') as log_file:
temp_dir = os.path.dirname(log_file_path)
with tempfile.NamedTemporaryFile(mode='w+', delete=False, dir=temp_dir,
prefix=f"{job_name}_logs_tmp_", suffix=".txt") as temp_logs:
prefix=f"{job_id}_logs_tmp_", suffix=".txt") as temp_logs:
temp_file_path = temp_logs.name
for line in w.stream(
v1.read_namespaced_pod_log,
name=job_name,
name=pod_name,
namespace=self.namespace,
follow=True,
_preload_content=False
Expand All @@ -214,9 +250,9 @@ def stream_logs(self, job_name):
self.concatenate_and_delete_files(log_file_path, temp_file_path)
else:
os.remove(temp_file_path)
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_name}'.")
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_id}'.")
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_name}': {e}")
logger.exception(f"Error streaming logs for job '{job_id}': {e}")

def handle_events(self, event):
"""
Expand All @@ -241,16 +277,22 @@ def handle_events(self, event):
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
args=(job_id, pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
log_filename = self.get_existing_log_filename(job_id)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
if os.path.exists(log_filename):
os.remove(log_filename)
logger.info(
f"Removed local log file '{log_filename}' since it already exists in storage.")
else:
self.object_storage_provider.upload_file(log_filename)
os.remove(log_filename)
logger.info(f"Removed local log file '{log_filename}' after successful upload.")
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
Expand Down
26 changes: 15 additions & 11 deletions scrapyd_k8s/k8s_resource_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, namespace, config):
self._stop_event = threading.Event()
self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True)
self.watcher_thread.start()
self._lock = threading.Lock()
logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.")

def subscribe(self, callback: Callable):
Expand All @@ -46,9 +47,10 @@ def subscribe(self, callback: Callable):
callback : Callable
A function to call when an event is received.
"""
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")
with self._lock:
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")

def unsubscribe(self, callback: Callable):
"""
Expand All @@ -59,9 +61,10 @@ def unsubscribe(self, callback: Callable):
callback : Callable
The subscriber function to remove.
"""
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")
with self._lock:
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")

def notify_subscribers(self, event: dict):
"""
Expand All @@ -72,11 +75,12 @@ def notify_subscribers(self, event: dict):
event : dict
The Kubernetes event data.
"""
for subscriber in self.subscribers:
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")
with self._lock:
for subscriber in self.subscribers:
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")

def watch_pods(self):
"""
Expand Down
Loading