Skip to content

Commit

Permalink
fix: added configurable buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Aug 13, 2024
1 parent 8612cf7 commit b8c7ffb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import random
import tempfile

from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemSection
from DIRAC.Core.Utilities.Os import getDiskSpace
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Utilities.DMSHelpers import DMSHelpers
Expand Down Expand Up @@ -227,9 +228,9 @@ def __checkDiskSpace(self, totalSize):
"""
diskSpace = getDiskSpace(self.__getDownloadDir(False)) # MB
availableBytes = diskSpace * 1024 * 1024 # bytes
# below can be a configuration option sent via the job wrapper in the future
# Moved from 3 to 5 GB (PhC 130822) for standard output file
bufferGBs = 5.0
bufferGBs = gConfig.getValue(
os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper", "InputDataBufferGBs"), 5.0
)
data = bufferGBs * 1024 * 1024 * 1024 # bufferGBs in bytes
if (data + totalSize) < availableBytes:
msg = f"Enough disk space available ({availableBytes} bytes)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,20 @@ def test_DLIDownloadFromBestSE_Fail(dli, mockSE, osPathExists):
assert not res["OK"]


def test_DLI_execute(dli, mockSE):
def test_DLI_execute(mocker, dli, mockSE):
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
dli._downloadFromSE = MagicMock(return_value=S_OK({"path": "/local/path/1.txt"}))
res = dli.execute(dataToResolve=["/a/lfn/1.txt"])
assert res["OK"]
assert not res["Value"]["Failed"]
assert "/a/lfn/1.txt" in res["Value"]["Successful"], res


def test_DLI_execute_getFileMetadata_Fails(dli, mockSE):
def test_DLI_execute_getFileMetadata_Fails(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{"Successful": {}, "Failed": {"/a/lfn/1.txt": "Error Getting MetaData"}}
Expand All @@ -137,8 +141,10 @@ def test_DLI_execute_getFileMetadata_Fails(dli, mockSE):
assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res


def test_DLI_execute_getFileMetadata_Lost(dli, mockSE):
def test_DLI_execute_getFileMetadata_Lost(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{
Expand All @@ -161,8 +167,10 @@ def test_DLI_execute_getFileMetadata_Lost(dli, mockSE):
assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res


def test_DLI_execute_getFileMetadata_Unavailable(dli, mockSE):
def test_DLI_execute_getFileMetadata_Unavailable(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{
Expand All @@ -185,8 +193,10 @@ def test_DLI_execute_getFileMetadata_Unavailable(dli, mockSE):
assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res


def test_DLI_execute_getFileMetadata_Cached(dli, mockSE):
def test_DLI_execute_getFileMetadata_Cached(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{
Expand All @@ -209,8 +219,10 @@ def test_DLI_execute_getFileMetadata_Cached(dli, mockSE):
assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res


def test_DLI_execute_FirstDownFailed(dli, mockSE):
def test_DLI_execute_FirstDownFailed(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{"Successful": {"/a/lfn/1.txt": {"Cached": 1, "Accessible": 0}}, "Failed": {}}
Expand All @@ -224,8 +236,10 @@ def test_DLI_execute_FirstDownFailed(dli, mockSE):
assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res


def test_DLI_execute_AllDownFailed(dli, mockSE):
def test_DLI_execute_AllDownFailed(mocker, dli, mockSE):
"""When getFileMetadata fails for the first SE, we should fall back to the second."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
mockObjectSE = mockSE.return_value
mockObjectSE.getFileMetadata.return_value = S_OK(
{"Successful": {"/a/lfn/1.txt": {"Cached": 1, "Accessible": 0}}, "Failed": {}}
Expand All @@ -239,8 +253,10 @@ def test_DLI_execute_AllDownFailed(dli, mockSE):
assert res["Value"]["Failed"][0] == "/a/lfn/1.txt", res


def test_DLI_execute_NoLocal(dli, mockSE):
def test_DLI_execute_NoLocal(mocker, dli, mockSE):
"""Data only at the remote SE."""
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo")
mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2)
dli = DownloadInputData(
{
"InputData": [],
Expand Down
2 changes: 2 additions & 0 deletions tests/Jenkins/dirac-cfg-update-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@
# to avoid having to wait while testing rescheduling
csAPI.setOption("Systems/WorkloadManagement/Production/Executors/Optimizers/JobScheduling/RescheduleDelays", "0")

csAPI.createSection("Systems/WorkloadManagement/Production/JobWrapper/")
csAPI.setOption("Systems/WorkloadManagement/Production/JobWrapper/InputDataBufferGBs", 1)

# Final action: commit in CS
res = csAPI.commit()
Expand Down

0 comments on commit b8c7ffb

Please sign in to comment.