Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split shared cache in backend and frontend #443

Merged
merged 2 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import cloudpickle

from executorlib.shared.cache import backend_load_file, backend_write_file
from executorlib.cache.backend import backend_load_file, backend_write_file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Migration to executorlib.cache.backend is incomplete

The architectural split is not fully implemented:

  • backend.py still imports from executorlib.shared.cache
  • executor.py still uses old imports from executorlib.shared.cache
  • Only the backend execution modules (cache_parallel.py and cache_serial.py) have adopted the new import path

This suggests the migration is only partially done and needs to be completed for architectural consistency.

🔗 Analysis chain

LGTM! Verify architectural consistency.

The import path change aligns with the PR objective of splitting shared cache between backend and frontend.

Let's verify the architectural consistency:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that other backend modules follow the same import pattern
# and that frontend modules don't import from backend cache

# Check if other backend modules use the new import path
echo "Checking backend modules for consistent import patterns:"
rg -l "from executorlib.cache.backend import" "executorlib/backend/"

# Verify frontend doesn't import from backend cache
echo "Checking frontend modules don't import from backend cache:"
rg -l "from executorlib.cache.backend import" "executorlib/frontend/"

# Check for any remaining references to the old path
echo "Checking for remaining references to old path:"
rg -l "from executorlib.shared.cache import" "executorlib/"

Length of output: 749


Script:

#!/bin/bash
# Let's check the content of these files to understand the import structure better

# Check the implementation in backend.py to see if it's properly split
echo "Content of executorlib/cache/backend.py:"
cat executorlib/cache/backend.py

# Check executor.py to understand why it still uses the old import
echo -e "\nContent of executorlib/cache/executor.py:"
cat executorlib/cache/executor.py

# Check cache_serial.py to verify consistent usage pattern
echo -e "\nContent of executorlib/backend/cache_serial.py:"
cat executorlib/backend/cache_serial.py

Length of output: 3643



def main() -> None:
Expand Down
4 changes: 2 additions & 2 deletions executorlib/backend/cache_serial.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from executorlib.shared.cache import execute_task_in_file
from executorlib.cache.backend import backend_execute_task_in_file

if __name__ == "__main__":
execute_task_in_file(file_name=sys.argv[1])
backend_execute_task_in_file(file_name=sys.argv[1])
64 changes: 64 additions & 0 deletions executorlib/cache/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from typing import Any

from executorlib.shared.cache import FutureItem
from executorlib.shared.hdf import dump, load


def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.

Args:
file_name (str): The name of the HDF5 file.

Returns:
dict: The loaded data from the file.

"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict
Comment on lines +8 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for file operations and Future resolution

While the implementation is correct, it lacks error handling for potential failure scenarios:

  • File I/O operations might fail
  • FutureItem.result() could raise exceptions

Consider adding try-except blocks:

 def backend_load_file(file_name: str) -> dict:
-    apply_dict = load(file_name=file_name)
-    apply_dict["args"] = [
-        arg if not isinstance(arg, FutureItem) else arg.result()
-        for arg in apply_dict["args"]
-    ]
-    apply_dict["kwargs"] = {
-        key: arg if not isinstance(arg, FutureItem) else arg.result()
-        for key, arg in apply_dict["kwargs"].items()
-    }
-    return apply_dict
+    try:
+        apply_dict = load(file_name=file_name)
+        try:
+            apply_dict["args"] = [
+                arg if not isinstance(arg, FutureItem) else arg.result()
+                for arg in apply_dict["args"]
+            ]
+            apply_dict["kwargs"] = {
+                key: arg if not isinstance(arg, FutureItem) else arg.result()
+                for key, arg in apply_dict["kwargs"].items()
+            }
+            return apply_dict
+        except Exception as e:
+            raise RuntimeError(f"Failed to resolve FutureItems: {str(e)}") from e
+    except Exception as e:
+        raise RuntimeError(f"Failed to load file {file_name}: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.
Args:
file_name (str): The name of the HDF5 file.
Returns:
dict: The loaded data from the file.
"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict
def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.
Args:
file_name (str): The name of the HDF5 file.
Returns:
dict: The loaded data from the file.
"""
try:
apply_dict = load(file_name=file_name)
try:
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict
except Exception as e:
raise RuntimeError(f"Failed to resolve FutureItems: {str(e)}") from e
except Exception as e:
raise RuntimeError(f"Failed to load file {file_name}: {str(e)}") from e



def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.

Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.

Returns:
None

"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
Comment on lines +31 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve robustness of file operations

The current implementation could be more robust in handling edge cases and failures:

  1. Temporary files might be left behind if the operation fails
  2. Existing .h5ready files could cause conflicts
  3. Write permissions aren't verified upfront

Consider this improved implementation:

 def backend_write_file(file_name: str, output: Any) -> None:
-    file_name_out = os.path.splitext(file_name)[0]
-    os.rename(file_name, file_name_out + ".h5ready")
-    dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
-    os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
+    file_name_out = os.path.splitext(file_name)[0]
+    temp_file = file_name_out + ".h5ready"
+    output_file = file_name_out + ".h5out"
+    
+    # Check permissions early
+    if not os.access(os.path.dirname(file_name_out), os.W_OK):
+        raise PermissionError(f"No write permission in directory: {os.path.dirname(file_name_out)}")
+    
+    # Clean up any existing temporary files
+    if os.path.exists(temp_file):
+        os.remove(temp_file)
+    
+    try:
+        os.rename(file_name, temp_file)
+        try:
+            dump(file_name=temp_file, data_dict={"output": output})
+            os.rename(temp_file, output_file)
+        except Exception as e:
+            # Restore original file if dump fails
+            os.rename(temp_file, file_name)
+            raise RuntimeError(f"Failed to write output: {str(e)}") from e
+    except Exception as e:
+        raise RuntimeError(f"Failed to process file {file_name}: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.
Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.
Returns:
None
"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.
Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.
Returns:
None
"""
file_name_out = os.path.splitext(file_name)[0]
temp_file = file_name_out + ".h5ready"
output_file = file_name_out + ".h5out"
# Check permissions early
if not os.access(os.path.dirname(file_name_out), os.W_OK):
raise PermissionError(f"No write permission in directory: {os.path.dirname(file_name_out)}")
# Clean up any existing temporary files
if os.path.exists(temp_file):
os.remove(temp_file)
try:
os.rename(file_name, temp_file)
try:
dump(file_name=temp_file, data_dict={"output": output})
os.rename(temp_file, output_file)
except Exception as e:
# Restore original file if dump fails
os.rename(temp_file, file_name)
raise RuntimeError(f"Failed to write output: {str(e)}") from e
except Exception as e:
raise RuntimeError(f"Failed to process file {file_name}: {str(e)}") from e



def backend_execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.

Args:
file_name (str): The file name of the HDF5 file as an absolute path.

Returns:
None
"""
apply_dict = backend_load_file(file_name=file_name)
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)
Comment on lines +49 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve function validation and documentation

The implementation needs better validation and error handling:

  1. No validation that 'fn' exists and is callable
  2. No error handling for the function execution
  3. Docstring should document the expected file format

Consider these improvements:

 def backend_execute_task_in_file(file_name: str) -> None:
     """
     Execute the task stored in a given HDF5 file.
 
     Args:
         file_name (str): The file name of the HDF5 file as an absolute path.
 
+    Expected file format:
+        The HDF5 file should contain a dictionary with:
+        - 'fn': A callable object
+        - 'args': List of positional arguments
+        - 'kwargs': Dictionary of keyword arguments
+
     Returns:
         None
+
+    Raises:
+        RuntimeError: If the file format is invalid or execution fails
     """
     apply_dict = backend_load_file(file_name=file_name)
-    result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
-    backend_write_file(
-        file_name=file_name,
-        output=result,
-    )
+    
+    # Validate function
+    if "fn" not in apply_dict:
+        raise RuntimeError("Missing 'fn' in task file")
+    if not callable(apply_dict["fn"]):
+        raise RuntimeError("The 'fn' object is not callable")
+    
+    try:
+        result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
+        backend_write_file(
+            file_name=file_name,
+            output=result,
+        )
+    except Exception as e:
+        raise RuntimeError(f"Task execution failed: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def backend_execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.
Args:
file_name (str): The file name of the HDF5 file as an absolute path.
Returns:
None
"""
apply_dict = backend_load_file(file_name=file_name)
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)
def backend_execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.
Args:
file_name (str): The file name of the HDF5 file as an absolute path.
Expected file format:
The HDF5 file should contain a dictionary with:
- 'fn': A callable object
- 'args': List of positional arguments
- 'kwargs': Dictionary of keyword arguments
Returns:
None
Raises:
RuntimeError: If the file format is invalid or execution fails
"""
apply_dict = backend_load_file(file_name=file_name)
# Validate function
if "fn" not in apply_dict:
raise RuntimeError("Missing 'fn' in task file")
if not callable(apply_dict["fn"]):
raise RuntimeError("The 'fn' object is not callable")
try:
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)
except Exception as e:
raise RuntimeError(f"Task execution failed: {str(e)}") from e

Comment on lines +1 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Security Advisory: Critical - Unsafe Deserialization of Arbitrary Functions

The verification confirms and elevates the security concern. The codebase uses cloudpickle to serialize and deserialize arbitrary Python functions from HDF5 files without any validation:

  1. In executorlib/shared/hdf.py, load() deserializes arbitrary functions using cloudpickle.loads() without any security checks
  2. Multiple components execute these functions directly:
    • executorlib/cache/backend.py
    • executorlib/interactive/backend.py
    • executorlib/backend/cache_parallel.py

Critical security risks:

  • Remote Code Execution (RCE) through malicious pickled data
  • No function whitelist or validation
  • No sandboxing of execution
  • Used in parallel execution contexts (MPI)

Immediate recommendations:

  1. Implement function validation before deserialization
  2. Use a secure serialization format or implement signing
  3. Run untrusted code in a restricted environment
  4. Add input validation for HDF5 files
🔗 Analysis chain

Security Advisory: Validate input sources

The code executes arbitrary functions loaded from HDF5 files, which could be a security risk if the files come from untrusted sources. Consider:

  1. Implementing a whitelist of allowed functions
  2. Adding validation of input file sources
  3. Running in a sandboxed environment

Let's check for other instances where HDF5 files are loaded and executed:


Let me search for HDF file operations and function execution patterns using different approaches since the previous command had issues with the file type recognition.


Let me examine the test files and implementation details to understand if there are any existing security measures or validation patterns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for HDF5 file operations and function execution
rg -l "load.*\.h5|dump.*\.h5|__call__.*args" --type python

Length of output: 91


Script:

#!/bin/bash
# Search for HDF5 file operations and function execution with ripgrep
rg -l "\.h5|__call__.*args|dump\(|load\("

# Search for imports of HDF functions
rg "from.*hdf.*import"

# Look for function execution patterns
ast-grep --pattern 'fn.__call__($$$)'

Length of output: 733


Script:

#!/bin/bash
# Check how the HDF files are used in the test files to understand validation
rg -A 5 "\.h5" tests/

# Look at the implementation of dump and load functions
cat executorlib/shared/hdf.py

# Check how functions are validated before execution in other files
rg -A 5 "__call__" executorlib/

Length of output: 7293

63 changes: 2 additions & 61 deletions executorlib/shared/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import subprocess
import sys
from concurrent.futures import Future
from typing import Any, Tuple
from typing import Tuple

from executorlib.shared.command import get_command_path
from executorlib.shared.hdf import dump, get_output, load
from executorlib.shared.hdf import dump, get_output
from executorlib.shared.serialize import serialize_funct_h5


Expand Down Expand Up @@ -47,47 +47,6 @@ def done(self) -> bool:
return get_output(file_name=self._file_name)[0]


def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.

Args:
file_name (str): The name of the HDF5 file.

Returns:
dict: The loaded data from the file.

"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict


def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.

Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.

Returns:
None

"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")


def execute_in_subprocess(
command: list, task_dependent_lst: list = []
) -> subprocess.Popen:
Expand Down Expand Up @@ -180,24 +139,6 @@ def execute_tasks_h5(
}


def execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.

Args:
file_name (str): The file name of the HDF5 file as an absolute path.

Returns:
None
"""
apply_dict = backend_load_file(file_name=file_name)
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)


def _get_execute_command(file_name: str, cores: int = 1) -> list:
"""
Get command to call backend as a list of two strings
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

try:
from executorlib.shared.cache import (
FutureItem,
execute_task_in_file,
_check_task_output,
)
from executorlib.cache.backend import FutureItem
from executorlib.shared.hdf import dump
Comment on lines 7 to 13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update imports to use backend functions

The verification reveals that:

  • execute_task_in_file has been moved to backend as backend_execute_task_in_file
  • _check_task_output is still in shared.cache and is used internally by the shared cache implementation
  • The test file is the only place still using the old execute_task_in_file import

You should:

  • Update the test to import backend_execute_task_in_file from executorlib.cache.backend instead of execute_task_in_file from shared
  • Keep the _check_task_output import as is since it's correctly located in shared.cache
🔗 Analysis chain

Consider migrating remaining shared.cache imports.

The import change for FutureItem aligns with the cache splitting objective. However, other functions like execute_task_in_file and _check_task_output are still imported from shared.cache. Consider if these should also be moved to the backend module for consistency.

Let's verify the current location of these functions:


Let me gather more information about the function locations and their implementations:


Let me check one more thing to understand the complete picture:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if execute_task_in_file and _check_task_output are used elsewhere
# and if they should be moved to backend

# Search for usage of these functions
echo "Searching for execute_task_in_file usage:"
rg "execute_task_in_file"

echo -e "\nSearching for _check_task_output usage:"
rg "_check_task_output"

# Check if these functions are defined in both shared and backend
echo -e "\nChecking function definitions:"
ast-grep --pattern 'def execute_task_in_file'
ast-grep --pattern 'def _check_task_output'

Length of output: 1319


Script:

#!/bin/bash
# Check the implementation of execute_task_in_file in shared.cache
echo "Checking execute_task_in_file in shared.cache:"
rg -A 10 "def execute_task_in_file" executorlib/shared/cache.py

# Check the implementation in backend
echo -e "\nChecking backend_execute_task_in_file in backend:"
rg -A 10 "def backend_execute_task_in_file" executorlib/cache/backend.py

# Check if there are any other references to these functions in imports
echo -e "\nChecking imports:"
rg "from executorlib.shared.cache import.*execute_task_in_file"
rg "from executorlib.cache.backend import.*execute_task_in_file"

Length of output: 940


Script:

#!/bin/bash
# Check the content of shared.cache to understand the relationship between functions
echo "Checking shared.cache content:"
cat executorlib/shared/cache.py

# Also check if there are any direct references to these functions
echo -e "\nChecking direct references to execute_task_in_file:"
rg "\bexecute_task_in_file\b" --type py

Length of output: 8330

from executorlib.shared.serialize import serialize_funct_h5

Expand Down
Loading