From dfc939bd887675c87124fa4d45022d64f1e457fc Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 20 Nov 2024 11:39:44 +0100 Subject: [PATCH 01/19] remove limited reconnection attempts; cap backoff time with 15 min; configure logs in a separate file logging_config.py that is imported to the __main__.py to trigger logs configuration before other modules inheret it; add debug logs to the watcher to better monitor each step --- scrapyd_k8s/logging_config.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 scrapyd_k8s/logging_config.py diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py new file mode 100644 index 0000000..a4dfbb3 --- /dev/null +++ b/scrapyd_k8s/logging_config.py @@ -0,0 +1,13 @@ +import logging +import sys + +def setup_logging(): + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + +setup_logging() From f472580eb4f2343982828e9a51095479bf3e821c Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 20 Nov 2024 13:43:46 +0100 Subject: [PATCH 02/19] refactor code to make config import and logging configuration separate modules; implement logging to be configured globally but keep separate logger for each module; add level for logging to be configurable in the config file --- kubernetes.yaml | 2 ++ scrapyd_k8s.sample-k8s.conf | 3 +++ scrapyd_k8s/__main__.py | 7 +++++-- scrapyd_k8s/api.py | 2 +- scrapyd_k8s/config_loader.py | 3 +++ scrapyd_k8s/logging_config.py | 30 +++++++++++++++++++++++------- 6 files changed, 37 insertions(+), 10 deletions(-) create mode 100644 scrapyd_k8s/config_loader.py diff --git a/kubernetes.yaml b/kubernetes.yaml index 75813a6..f6ce531 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -89,6 +89,8 @@ data: namespace = default max_proc = 2 + + logging_level = DEBUG # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 9ab3536..2ee7016 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -22,6 +22,9 @@ namespace = default # Maximum number of jobs running in parallel max_proc = 10 +# Set the level for logging (e.g., INFO, DEBUG) +logging_level = DEBUG + # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment diff --git a/scrapyd_k8s/__main__.py b/scrapyd_k8s/__main__.py index 370999f..27ac39a 100644 --- a/scrapyd_k8s/__main__.py +++ b/scrapyd_k8s/__main__.py @@ -1,4 +1,7 @@ -from .api import run - +from .config_loader import config +from .logging_config import setup_logging if __name__ == "__main__": + logging_level = config.scrapyd().get('logging_level', 'INFO') + setup_logging(logging_level) + from .api import run # Import after logging is configured run() diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index df013b8..9c8d20a 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -5,6 +5,7 @@ from flask_basicauth import BasicAuth from natsort import natsort_keygen, ns + # setup logging before anything else from .config import Config from .logging import setup_logging @@ -17,7 +18,6 @@ launcher = (config.launcher_cls())(config) scrapyd_config = config.scrapyd() - @app.get("/") def home(): return "

scrapyd-k8s

" diff --git a/scrapyd_k8s/config_loader.py b/scrapyd_k8s/config_loader.py new file mode 100644 index 0000000..073b33b --- /dev/null +++ b/scrapyd_k8s/config_loader.py @@ -0,0 +1,3 @@ +from .config import Config + +config = Config() \ No newline at end of file diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py index a4dfbb3..aa1f9b2 100644 --- a/scrapyd_k8s/logging_config.py +++ b/scrapyd_k8s/logging_config.py @@ -1,13 +1,29 @@ import logging import sys -def setup_logging(): +VALID_LOG_LEVELS = { + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG, + 'NOTSET': logging.NOTSET, +} + +def setup_logging(logging_level): + if not logging_level: + logging_level = 'INFO' # Default to INFO if logging_level is None + + level_name = str(logging_level).upper() + + if level_name not in VALID_LOG_LEVELS: + valid_levels_str = ', '.join(VALID_LOG_LEVELS.keys()) + raise ValueError( + f"Invalid logging level '{logging_level}'. Valid levels are: {valid_levels_str}" + ) + logging.basicConfig( - level=logging.DEBUG, + level=VALID_LOG_LEVELS[level_name], format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', - handlers=[ - logging.StreamHandler(sys.stdout) - ] + handlers=[logging.StreamHandler(sys.stdout)] ) - -setup_logging() From 19bfc19610062d2e46d65da15361169c8eddbdec Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 20 Nov 2024 13:54:13 +0100 Subject: [PATCH 03/19] change level to INFO in config instead of DEBUG to make it less confusing for users --- kubernetes.yaml | 2 +- scrapyd_k8s.sample-k8s.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kubernetes.yaml b/kubernetes.yaml index f6ce531..5f9f400 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -90,7 +90,7 @@ data: max_proc = 2 - logging_level = DEBUG + logging_level = INFO # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 2ee7016..9bcaaa6 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -23,7 +23,7 @@ namespace = default max_proc = 10 # Set the level for logging (e.g., INFO, DEBUG) -logging_level = DEBUG +logging_level = INFO # For each project, define a project section. # This contains a repository that points to the remote container repository. From 4761f2699e768ff36bb22a51bd472b6b68db9fca Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 20 Nov 2024 14:09:26 +0100 Subject: [PATCH 04/19] remove lines to configure the logging level --- kubernetes.yaml | 2 -- scrapyd_k8s.sample-k8s.conf | 3 --- 2 files changed, 5 deletions(-) diff --git a/kubernetes.yaml b/kubernetes.yaml index 5f9f400..75813a6 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -89,8 +89,6 @@ data: namespace = default max_proc = 2 - - logging_level = INFO # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 9bcaaa6..9ab3536 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -22,9 +22,6 @@ namespace = default # Maximum number of jobs running in parallel max_proc = 10 -# Set the level for logging (e.g., INFO, DEBUG) -logging_level = INFO - # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment From 879206351e6febc0c0270014e2d6a32decc2e1be Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 20 Nov 2024 14:14:15 +0100 Subject: [PATCH 05/19] add logging.getLevelName() method to be used instead of creating a dict with all available logging level to validate user input from the config file --- scrapyd_k8s/logging_config.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py index aa1f9b2..5b8a65a 100644 --- a/scrapyd_k8s/logging_config.py +++ b/scrapyd_k8s/logging_config.py @@ -1,29 +1,18 @@ import logging import sys -VALID_LOG_LEVELS = { - 'CRITICAL': logging.CRITICAL, - 'ERROR': logging.ERROR, - 'WARNING': logging.WARNING, - 'INFO': logging.INFO, - 'DEBUG': logging.DEBUG, - 'NOTSET': logging.NOTSET, -} - def setup_logging(logging_level): if not logging_level: logging_level = 'INFO' # Default to INFO if logging_level is None level_name = str(logging_level).upper() - - if level_name not in VALID_LOG_LEVELS: - valid_levels_str = ', '.join(VALID_LOG_LEVELS.keys()) + numeric_level = logging.getLevelName(level_name) + if not isinstance(numeric_level, int): raise ValueError( - f"Invalid logging level '{logging_level}'. Valid levels are: {valid_levels_str}" + f"Invalid logging level '{logging_level}'." ) - logging.basicConfig( - level=VALID_LOG_LEVELS[level_name], + level=numeric_level, format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) From 7609bb6bd96f63a128e5f555977c7526dd2787ec Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Fri, 22 Nov 2024 09:57:07 +0100 Subject: [PATCH 06/19] move logging configuration to the api.py; move config loading to the api.py; keep logging_config file --- scrapyd_k8s/__main__.py | 7 ++----- scrapyd_k8s/api.py | 2 +- scrapyd_k8s/config_loader.py | 3 --- scrapyd_k8s/logging_config.py | 2 +- 4 files changed, 4 insertions(+), 10 deletions(-) delete mode 100644 scrapyd_k8s/config_loader.py diff --git a/scrapyd_k8s/__main__.py b/scrapyd_k8s/__main__.py index 27ac39a..370999f 100644 --- a/scrapyd_k8s/__main__.py +++ b/scrapyd_k8s/__main__.py @@ -1,7 +1,4 @@ -from .config_loader import config -from .logging_config import setup_logging +from .api import run + if __name__ == "__main__": - logging_level = config.scrapyd().get('logging_level', 'INFO') - setup_logging(logging_level) - from .api import run # Import after logging is configured run() diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index 9c8d20a..df013b8 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -5,7 +5,6 @@ from flask_basicauth import BasicAuth from natsort import natsort_keygen, ns - # setup logging before anything else from .config import Config from .logging import setup_logging @@ -18,6 +17,7 @@ launcher = (config.launcher_cls())(config) scrapyd_config = config.scrapyd() + @app.get("/") def home(): return "

scrapyd-k8s

" diff --git a/scrapyd_k8s/config_loader.py b/scrapyd_k8s/config_loader.py deleted file mode 100644 index 073b33b..0000000 --- a/scrapyd_k8s/config_loader.py +++ /dev/null @@ -1,3 +0,0 @@ -from .config import Config - -config = Config() \ No newline at end of file diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py index 5b8a65a..faaa79c 100644 --- a/scrapyd_k8s/logging_config.py +++ b/scrapyd_k8s/logging_config.py @@ -15,4 +15,4 @@ def setup_logging(logging_level): level=numeric_level, format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', handlers=[logging.StreamHandler(sys.stdout)] - ) + ) \ No newline at end of file From 4e0405c28013186f403a318ef6870b3ebac7ee43 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Fri, 22 Nov 2024 10:33:53 +0100 Subject: [PATCH 07/19] rename logging_level to log_level --- scrapyd_k8s/logging_config.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py index faaa79c..22186d2 100644 --- a/scrapyd_k8s/logging_config.py +++ b/scrapyd_k8s/logging_config.py @@ -1,15 +1,15 @@ import logging import sys -def setup_logging(logging_level): - if not logging_level: - logging_level = 'INFO' # Default to INFO if logging_level is None +def setup_logging(log_level): + if not log_level: + log_level = 'INFO' # Default to INFO if logging_level is None - level_name = str(logging_level).upper() + level_name = str(log_level).upper() numeric_level = logging.getLevelName(level_name) if not isinstance(numeric_level, int): raise ValueError( - f"Invalid logging level '{logging_level}'." + f"Invalid logging level '{log_level}'." ) logging.basicConfig( level=numeric_level, From 82c55e22fc97bc8f4d3bc2af79857fb13f921aca Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Fri, 22 Nov 2024 12:15:32 +0100 Subject: [PATCH 08/19] remove redundant check of the log_level in the logging_config.py --- scrapyd_k8s/logging_config.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py index 22186d2..3b2c9cb 100644 --- a/scrapyd_k8s/logging_config.py +++ b/scrapyd_k8s/logging_config.py @@ -2,9 +2,6 @@ import sys def setup_logging(log_level): - if not log_level: - log_level = 'INFO' # Default to INFO if logging_level is None - level_name = str(log_level).upper() numeric_level = logging.getLevelName(level_name) if not isinstance(numeric_level, int): From d47c9e8c81917b7adad46b3fda4d3db407b0af2d Mon Sep 17 00:00:00 2001 From: wvengen Date: Fri, 22 Nov 2024 19:09:46 +0100 Subject: [PATCH 09/19] Small changes --- scrapyd_k8s/logging_config.py | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 scrapyd_k8s/logging_config.py diff --git a/scrapyd_k8s/logging_config.py b/scrapyd_k8s/logging_config.py deleted file mode 100644 index 3b2c9cb..0000000 --- a/scrapyd_k8s/logging_config.py +++ /dev/null @@ -1,15 +0,0 @@ -import logging -import sys - -def setup_logging(log_level): - level_name = str(log_level).upper() - numeric_level = logging.getLevelName(level_name) - if not isinstance(numeric_level, int): - raise ValueError( - f"Invalid logging level '{log_level}'." - ) - logging.basicConfig( - level=numeric_level, - format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', - handlers=[logging.StreamHandler(sys.stdout)] - ) \ No newline at end of file From d274cacceaa6b705ae649d56bee33d26397d9299 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 26 Nov 2024 10:50:06 +0100 Subject: [PATCH 10/19] change timeout_seconds parameter for k8s api connection from 0 to 300 to reset the connection periodically; add lock for events like subscibe, unsubscribe and event dispatching --- scrapyd_k8s/k8s_resource_watcher.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py index 836560b..812cf0e 100644 --- a/scrapyd_k8s/k8s_resource_watcher.py +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -35,6 +35,7 @@ def __init__(self, namespace, config): self._stop_event = threading.Event() self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True) self.watcher_thread.start() + self._lock = threading.Lock() logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.") def subscribe(self, callback: Callable): @@ -46,9 +47,10 @@ def subscribe(self, callback: Callable): callback : Callable A function to call when an event is received. """ - if callback not in self.subscribers: - self.subscribers.append(callback) - logger.debug(f"Subscriber {callback.__name__} added.") + with self._lock: + if callback not in self.subscribers: + self.subscribers.append(callback) + logger.debug(f"Subscriber {callback.__name__} added.") def unsubscribe(self, callback: Callable): """ @@ -59,9 +61,10 @@ def unsubscribe(self, callback: Callable): callback : Callable The subscriber function to remove. """ - if callback in self.subscribers: - self.subscribers.remove(callback) - logger.debug(f"Subscriber {callback.__name__} removed.") + with self._lock: + if callback in self.subscribers: + self.subscribers.remove(callback) + logger.debug(f"Subscriber {callback.__name__} removed.") def notify_subscribers(self, event: dict): """ @@ -72,11 +75,12 @@ def notify_subscribers(self, event: dict): event : dict The Kubernetes event data. """ - for subscriber in self.subscribers: - try: - subscriber(event) - except Exception as e: - logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}") + with self._lock: + for subscriber in self.subscribers: + try: + subscriber(event) + except Exception as e: + logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}") def watch_pods(self): """ @@ -93,7 +97,7 @@ def watch_pods(self): try: kwargs = { 'namespace': self.namespace, - 'timeout_seconds': 0, + 'timeout_seconds': 300, } if resource_version: kwargs['resource_version'] = resource_version From f633d5a8bf35f0cb72b171ac70eeafcac8635dc9 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Thu, 28 Nov 2024 11:49:59 +0100 Subject: [PATCH 11/19] change timeout_seconds watcher param back to 0 to try and hold connection forever --- scrapyd_k8s/k8s_resource_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py index 812cf0e..7e9991e 100644 --- a/scrapyd_k8s/k8s_resource_watcher.py +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -97,7 +97,7 @@ def watch_pods(self): try: kwargs = { 'namespace': self.namespace, - 'timeout_seconds': 300, + 'timeout_seconds': 0, } if resource_version: kwargs['resource_version'] = resource_version From 9c64ce457fb564ed4de28486e06b324bd766dacf Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 3 Dec 2024 15:51:14 +0100 Subject: [PATCH 12/19] remove job_name mapping and implement a stateless directory sreen to check for logfiles of the jobs that are still on the cluster in Completed or Failed state --- scrapyd_k8s/joblogs/log_handler_k8s.py | 39 +++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 836e8f7..8f77983 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -22,8 +22,6 @@ class KubernetesJobLogHandler: Configuration object containing settings for job logs and storage. watcher_threads : dict Dictionary to keep track of watcher threads for each pod. - pod_tmp_mapping : dict - Mapping of pod names to their temporary log file paths. namespace : str Kubernetes namespace to watch pods in. num_lines_to_check : int @@ -60,11 +58,37 @@ def __init__(self, config): """ self.config = config self.watcher_threads = {} - self.pod_tmp_mapping = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) self.object_storage_provider = LibcloudObjectStorage(self.config) + def get_existing_log_filename(self, job_name): + """ + Retrieves the existing temporary log file path for a job without creating a new one. + + Parameters + ---------- + job_name : str + Name of the Kubernetes job or pod. + + Returns + ------- + str or None + Path to the existing temporary log file for the given job, or None if no such file exists. + """ + temp_dir = tempfile.gettempdir() + app_temp_dir = os.path.join(temp_dir, 'job_logs') + + if not os.path.isdir(app_temp_dir): + return None + + # Check for existing files matching the job_name + for filename in os.listdir(app_temp_dir): + if filename.startswith(f"{job_name}_logs_") and filename.endswith(".txt"): + return os.path.join(app_temp_dir, filename) + + return None + def get_last_n_lines(self, file_path, num_lines): """ Efficiently retrieves the last `num_lines` lines from a file. @@ -155,14 +179,15 @@ def make_log_filename_for_job(self, job_name): str Path to the temporary log file for the given job. """ - if self.pod_tmp_mapping.get(job_name) is not None: - return self.pod_tmp_mapping[job_name] + existing_file = self.get_existing_log_filename(job_name) + if existing_file: + return existing_file + # Create a new log file if no existing file is found temp_dir = tempfile.gettempdir() app_temp_dir = os.path.join(temp_dir, 'job_logs') os.makedirs(app_temp_dir, exist_ok=True) fd, path = tempfile.mkstemp(prefix=f"{job_name}_logs_", suffix=".txt", dir=app_temp_dir) os.close(fd) - self.pod_tmp_mapping[job_name] = path return path def stream_logs(self, job_name): @@ -245,7 +270,7 @@ def handle_events(self, event): ) self.watcher_threads[thread_name].start() elif pod.status.phase in ['Succeeded', 'Failed']: - log_filename = self.pod_tmp_mapping.get(pod_name) + log_filename = self.get_existing_log_filename(pod_name) if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: if self.object_storage_provider.object_exists(job_id): logger.info(f"Log file for job '{job_id}' already exists in storage.") From 27317ebf2954c4f587da3ae9dfb08ab8d7a24b04 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 4 Dec 2024 08:56:14 +0100 Subject: [PATCH 13/19] remove empty line --- scrapyd_k8s/joblogs/log_handler_k8s.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 8f77983..790b0dd 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -78,7 +78,6 @@ def get_existing_log_filename(self, job_name): """ temp_dir = tempfile.gettempdir() app_temp_dir = os.path.join(temp_dir, 'job_logs') - if not os.path.isdir(app_temp_dir): return None From a3de0b76a1d37ee7e6cf1708265e8c83431d7689 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 17 Dec 2024 08:45:46 +0100 Subject: [PATCH 14/19] make directory for logs configurable providing a default option; remove random component from log files names and keep job_name as one --- scrapyd_k8s/joblogs/log_handler_k8s.py | 85 ++++++++++++++------------ 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 790b0dd..9371b33 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -12,12 +12,16 @@ class KubernetesJobLogHandler: """ A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage. - ... + This class: + - Observes Kubernetes pods for job-related events. + - Streams logs from running pods, storing them locally. + - Uploads completed job logs to object storage. + - Retrieves and concatenates log files as needed. Attributes ---------- DEFAULT_BLOCK_SIZE : int - Default size (in bytes) of blocks to read when retrieving the last N lines from a file. + Default size (in bytes) of blocks to read when retrieving lines from a file. config : object Configuration object containing settings for job logs and storage. watcher_threads : dict @@ -25,24 +29,29 @@ class KubernetesJobLogHandler: namespace : str Kubernetes namespace to watch pods in. num_lines_to_check : int - Number of lines to check for matching logs when resuming log streaming. + Number of lines to check from the end of the existing log file to avoid duplicates. object_storage_provider : LibcloudObjectStorage Instance of the object storage provider for uploading logs. Methods ------- - start(): - Starts the pod watcher thread for job logs. + get_existing_log_filename(job_name): + Retrieves an existing temporary log file path for a given job name. + get_last_n_lines(file_path, num_lines): Efficiently retrieves the last `num_lines` lines from a file. + concatenate_and_delete_files(main_file_path, temp_file_path, block_size=6144): Concatenates a temporary file to the main log file and deletes the temporary file. + make_log_filename_for_job(job_name): - Generates a unique temporary file path for storing logs of a job. + Ensures a log file exists for a given job and returns its path. + stream_logs(job_name): - Streams logs from a Kubernetes pod and writes them to a file. - watch_pods(): - Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. + Streams logs from a Kubernetes pod corresponding to the given job name and writes them to a file. + + handle_events(event): + Processes Kubernetes pod events to start log streaming or upload logs when pods complete. """ # The value was chosen to provide a balance between memory usage and the number of I/O operations DEFAULT_BLOCK_SIZE = 6144 @@ -60,6 +69,7 @@ def __init__(self, config): self.watcher_threads = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) + self.logs_dir = self.config.scrapyd().get('logs_dir', '/tmp/scrapyd_k8s_logs').strip() self.object_storage_provider = LibcloudObjectStorage(self.config) def get_existing_log_filename(self, job_name): @@ -69,23 +79,16 @@ def get_existing_log_filename(self, job_name): Parameters ---------- job_name : str - Name of the Kubernetes job or pod. + Name of the Kubernetes job or pod, which is also the name of the log file. Returns ------- str or None Path to the existing temporary log file for the given job, or None if no such file exists. """ - temp_dir = tempfile.gettempdir() - app_temp_dir = os.path.join(temp_dir, 'job_logs') - if not os.path.isdir(app_temp_dir): - return None - - # Check for existing files matching the job_name - for filename in os.listdir(app_temp_dir): - if filename.startswith(f"{job_name}_logs_") and filename.endswith(".txt"): - return os.path.join(app_temp_dir, filename) - + log_file_path = os.path.join(self.logs_dir, f"{job_name}.txt") + if os.path.isfile(log_file_path): + return log_file_path return None def get_last_n_lines(self, file_path, num_lines): @@ -166,28 +169,32 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz def make_log_filename_for_job(self, job_name): """ - Generates a unique temporary file path for storing logs of a job. + Creates a log file path for a job, using the job name as the file name or returns a path to an existing file. - Parameters - ---------- - job_name : str - Name of the Kubernetes job or pod. + Parameters + ---------- + job_name : str + Name of the Kubernetes job. - Returns - ------- - str - Path to the temporary log file for the given job. + Returns + ------- + str + Path to the temporary log file for the given job. """ - existing_file = self.get_existing_log_filename(job_name) - if existing_file: - return existing_file - # Create a new log file if no existing file is found - temp_dir = tempfile.gettempdir() - app_temp_dir = os.path.join(temp_dir, 'job_logs') - os.makedirs(app_temp_dir, exist_ok=True) - fd, path = tempfile.mkstemp(prefix=f"{job_name}_logs_", suffix=".txt", dir=app_temp_dir) - os.close(fd) - return path + + if not os.path.isdir(self.logs_dir): + os.makedirs(self.logs_dir) + + log_file_path = os.path.join(self.logs_dir, f"{job_name}.txt") + if os.path.exists(log_file_path): + return log_file_path + + with open(log_file_path, 'w') as file: + pass + + return log_file_path + + def stream_logs(self, job_name): """ From fc50b64a317a483b90af73aa837bdc746bdd92d8 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 17 Dec 2024 09:07:10 +0100 Subject: [PATCH 15/19] describe a new parameter in CONFIG.md to provide info on how to set a directory for logs --- CONFIG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONFIG.md b/CONFIG.md index dca978d..4d395d8 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -13,6 +13,7 @@ stick to [scrapyd's configuration](https://scrapyd.readthedocs.io/en/latest/conf * `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) * `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) * `log_level` - Log level, defaults to `INFO` +* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes), if not provided there is a default value `/tmp/scrapyd_k8s_logs`. When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems. The Docker and Kubernetes launchers have their own additional options. From 1189e80308021a590bd315fcc5872a56d217d3f4 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 17 Dec 2024 10:08:52 +0100 Subject: [PATCH 16/19] remove job_name as it is not correct, use job_id for file names and pod_name to indicate which stdout (logs) to read from --- scrapyd_k8s/joblogs/log_handler_k8s.py | 41 ++++++++++++++------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 9371b33..a568034 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -72,21 +72,21 @@ def __init__(self, config): self.logs_dir = self.config.scrapyd().get('logs_dir', '/tmp/scrapyd_k8s_logs').strip() self.object_storage_provider = LibcloudObjectStorage(self.config) - def get_existing_log_filename(self, job_name): + def get_existing_log_filename(self, job_id): """ Retrieves the existing temporary log file path for a job without creating a new one. Parameters ---------- - job_name : str - Name of the Kubernetes job or pod, which is also the name of the log file. + job_id : str + ID of the Kubernetes job or pod, which is also the name of the log file. Returns ------- str or None Path to the existing temporary log file for the given job, or None if no such file exists. """ - log_file_path = os.path.join(self.logs_dir, f"{job_name}.txt") + log_file_path = os.path.join(self.logs_dir, f"{job_id}.txt") if os.path.isfile(log_file_path): return log_file_path return None @@ -167,14 +167,14 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz except (IOError, OSError) as e: logger.error(f"Failed to concatenate and delete files for job: {e}") - def make_log_filename_for_job(self, job_name): + def make_log_filename_for_job(self, job_id): """ Creates a log file path for a job, using the job name as the file name or returns a path to an existing file. Parameters ---------- - job_name : str - Name of the Kubernetes job. + job_id : str + ID of the Kubernetes job. Returns ------- @@ -185,7 +185,7 @@ def make_log_filename_for_job(self, job_name): if not os.path.isdir(self.logs_dir): os.makedirs(self.logs_dir) - log_file_path = os.path.join(self.logs_dir, f"{job_name}.txt") + log_file_path = os.path.join(self.logs_dir, f"{job_id}.txt") if os.path.exists(log_file_path): return log_file_path @@ -196,14 +196,17 @@ def make_log_filename_for_job(self, job_name): - def stream_logs(self, job_name): + def stream_logs(self, job_id, pod_name): """ Streams logs from a Kubernetes pod and writes them to a file. Parameters ---------- - job_name : str - Name of the Kubernetes pod to stream logs from. + job_id : str + ID of the Kubernetes job to use as a log file name. + + pod_name : str + Name of the Kubernetes pod to read logs from. Returns ------- @@ -212,20 +215,20 @@ def stream_logs(self, job_name): log_lines_counter = 0 v1 = client.CoreV1Api() w = watch.Watch() - log_file_path = self.make_log_filename_for_job(job_name) + log_file_path = self.make_log_filename_for_job(job_id) last_n_lines = self.get_last_n_lines(log_file_path, self.num_lines_to_check) if len(last_n_lines) == 0: - logger.info(f"Log file '{log_file_path}' is empty or not found. Starting fresh logs for job '{job_name}'.") + logger.info(f"Log file '{log_file_path}' is empty or not found. Starting fresh logs for job '{job_id}'.") try: with open(log_file_path, 'a') as log_file: temp_dir = os.path.dirname(log_file_path) with tempfile.NamedTemporaryFile(mode='w+', delete=False, dir=temp_dir, - prefix=f"{job_name}_logs_tmp_", suffix=".txt") as temp_logs: + prefix=f"{job_id}_logs_tmp_", suffix=".txt") as temp_logs: temp_file_path = temp_logs.name for line in w.stream( v1.read_namespaced_pod_log, - name=job_name, + name=pod_name, namespace=self.namespace, follow=True, _preload_content=False @@ -245,9 +248,9 @@ def stream_logs(self, job_name): self.concatenate_and_delete_files(log_file_path, temp_file_path) else: os.remove(temp_file_path) - logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_name}'.") + logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_id}'.") except Exception as e: - logger.exception(f"Error streaming logs for job '{job_name}': {e}") + logger.exception(f"Error streaming logs for job '{job_id}': {e}") def handle_events(self, event): """ @@ -272,11 +275,11 @@ def handle_events(self, event): else: self.watcher_threads[thread_name] = threading.Thread( target=self.stream_logs, - args=(pod_name,) + args=(job_id, pod_name,) ) self.watcher_threads[thread_name].start() elif pod.status.phase in ['Succeeded', 'Failed']: - log_filename = self.get_existing_log_filename(pod_name) + log_filename = self.get_existing_log_filename(job_id) if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: if self.object_storage_provider.object_exists(job_id): logger.info(f"Log file for job '{job_id}' already exists in storage.") From 87ce2c8b7f75e7fbcec9c3b31eef66e6246cd830 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 17 Dec 2024 10:40:32 +0100 Subject: [PATCH 17/19] move logs_dir param to the joblogs section in config, adjust code to read it from there if provided --- CONFIG.md | 5 ++++- scrapyd_k8s/joblogs/log_handler_k8s.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index 4d395d8..f1aaa6b 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -13,7 +13,6 @@ stick to [scrapyd's configuration](https://scrapyd.readthedocs.io/en/latest/conf * `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) * `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) * `log_level` - Log level, defaults to `INFO` -* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes), if not provided there is a default value `/tmp/scrapyd_k8s_logs`. When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems. The Docker and Kubernetes launchers have their own additional options. @@ -46,6 +45,10 @@ For Kubernetes, it is important to set resource limits. TODO: explain how to set limits, with default, project and spider specificity. +### [joblogs] section +* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes), if not provided there is a default value `/tmp/scrapyd_k8s_logs`. When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems. + + ### Kubernetes API interaction diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index a568034..3bd057d 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -69,7 +69,7 @@ def __init__(self, config): self.watcher_threads = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) - self.logs_dir = self.config.scrapyd().get('logs_dir', '/tmp/scrapyd_k8s_logs').strip() + self.logs_dir = self.config.joblogs().get('logs_dir', '/tmp/scrapyd_k8s_logs').strip() self.object_storage_provider = LibcloudObjectStorage(self.config) def get_existing_log_filename(self, job_id): From b2dd99243616b046a3515481b6c8882b33dec4ff Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 17 Dec 2024 17:16:46 +0100 Subject: [PATCH 18/19] remove default value from the code, make logs_dir a mandatory param in the joblogs section of the config file, in the KubernetesJobLogHandler init throw a value error if logs_dir is not provided in the config --- CONFIG.md | 2 +- kubernetes.yaml | 3 +++ scrapyd_k8s/joblogs/log_handler_k8s.py | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index f1aaa6b..c5438ff 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -46,7 +46,7 @@ For Kubernetes, it is important to set resource limits. TODO: explain how to set limits, with default, project and spider specificity. ### [joblogs] section -* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes), if not provided there is a default value `/tmp/scrapyd_k8s_logs`. When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems. +* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes). When configuring, keep in mind that in the Dockerfile the `USER` is set to `nobody` so not all directories are writable, but if you make a child directory under `/tmp` you won't encounter permission problems. diff --git a/kubernetes.yaml b/kubernetes.yaml index 75813a6..75bd2c4 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -104,6 +104,9 @@ data: requests_memory = 0.2G limits_cpu = 0.8 limits_memory = 0.5G + + [joblogs] + logs_dir = /tmp/scrapyd_k8s_logs --- apiVersion: v1 kind: Secret diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 3bd057d..3d1a281 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -69,7 +69,9 @@ def __init__(self, config): self.watcher_threads = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) - self.logs_dir = self.config.joblogs().get('logs_dir', '/tmp/scrapyd_k8s_logs').strip() + self.logs_dir = self.config.joblogs().get('logs_dir').strip() + if not self.logs_dir: + raise ValueError("Configuration error: 'logs_dir' is missing in joblogs configuration section.") self.object_storage_provider = LibcloudObjectStorage(self.config) def get_existing_log_filename(self, job_id): From b682edc1bc0b6aed906cdf500d4c2066eefb57fd Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Wed, 18 Dec 2024 11:03:57 +0100 Subject: [PATCH 19/19] added functionality to remove successfully ploaded files; additionaly on a check when old jobs are present on the cluster and the code checks that the files of those jobs already exist in the S3 storage, added a file removal if it is still in the volume --- scrapyd_k8s/joblogs/log_handler_k8s.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 3d1a281..61c4592 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -285,8 +285,14 @@ def handle_events(self, event): if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: if self.object_storage_provider.object_exists(job_id): logger.info(f"Log file for job '{job_id}' already exists in storage.") + if os.path.exists(log_filename): + os.remove(log_filename) + logger.info( + f"Removed local log file '{log_filename}' since it already exists in storage.") else: self.object_storage_provider.upload_file(log_filename) + os.remove(log_filename) + logger.info(f"Removed local log file '{log_filename}' after successful upload.") else: logger.info(f"Logfile not found for job '{job_id}'") else: