Skip to content

Commit

Permalink
Merge pull request #2261 from ganga-devs/autopep8-patches/FixDiracSpl…
Browse files Browse the repository at this point in the history
…itter

PEP8 fixes for PR #2260 (FixDiracSplitter) by autopep8
  • Loading branch information
mesmith75 authored Jan 11, 2024
2 parents 1dbcd13 + 25b04ee commit 9f2f737
Showing 1 changed file with 24 additions and 26 deletions.
50 changes: 24 additions & 26 deletions ganga/GangaDirac/Lib/Splitters/OfflineGangaDiracSplitter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from GangaCore.Core.exceptions import SplitterError
from GangaDirac.Lib.Backends.DiracUtils import result_ok
from GangaCore.Utility.Config import getConfig
from GangaCore.Utility.logging import getLogger
from GangaDirac.Lib.Utilities.DiracUtilities import execute, GangaDiracError
Expand Down Expand Up @@ -56,7 +55,7 @@
number of SE add this subset to the list of used LFN and continue to the next
unallocated LFN.
So on the first iteration of the splitter we identify how many files we can put
into a larger subset of files.
i.e. how many jobs >= 0.75 * 100LFN and <= 100LFN with 2 site redundancy
Expand Down Expand Up @@ -85,7 +84,7 @@
This favours generating larger subsets with multiple sites where the jobs can run
This favours generating larger subsets with multiple sites where the jobs can run
but when there are LFN which can't be allocated to sites with multiple SE the algorithm
will attempt to find larger subsets with reduced redundancy.
"""
Expand Down Expand Up @@ -132,7 +131,7 @@ def find_random_site(original_SE_list, banned_SE):
while chosen_element == "" and len(input_list) > 0:
global global_random
this_element = global_random.sample(input_list, 1)[0]
if not this_element in banned_SE:
if this_element not in banned_SE:
chosen_element = this_element
break
else:
Expand All @@ -146,7 +145,8 @@ def getLFNReplicas(allLFNs, index, allLFNData):
This method gets the location of all replicas for 'allLFNs' and stores the infomation in 'allLFNData'
This is a 'static' method which is called multiple times with the same 'allLFNs' and 'allLFNData' and different index.
e.g. This allows Dirac to determine the replicas for ~250LFN all at once rather than for ~40,000 all at once which risks timeouts and other errors
e.g. This allows Dirac to determine the replicas for ~250LFN all at once
rather than for ~40,000 all at once which risks timeouts and other errors
Args:
allLFNs (list): This is a list of all LFN which have replicas on the grid
Expand Down Expand Up @@ -242,8 +242,6 @@ def calculateSiteSEMapping(file_replicas, uniqueSE, CE_to_SE_mapping, SE_to_CE_m
"""

SE_dict = dict()
maps_size = 0
found = []

logger.info("Calculating site<->SE Mapping")

Expand Down Expand Up @@ -302,11 +300,13 @@ def calculateSiteSEMapping(file_replicas, uniqueSE, CE_to_SE_mapping, SE_to_CE_m

def lookUpLFNReplicas(inputs, ignoremissing):
"""
This method launches several worker threads to collect the replica information for all LFNs which are given as inputs and stores this in allLFNData
This method launches several worker threads to collect the replica information for all LFNs,
which are given as inputs and stores this in allLFNData
Args:
inputs (list): This is a list of input DiracFile which are
inputs (list): This is a list of input DiracFile which are
Returns:
bad_lfns (list): A list of LFN which have no replica information when querying `getReplicasForJobs` from DIRAC
bad_lfns (list): A list of LFN which have no replica information when querying
`getReplicasForJobs` from DIRAC
"""
allLFNData = {}
# Build a useful dictionary and list
Expand Down Expand Up @@ -335,7 +335,7 @@ def lookUpLFNReplicas(inputs, ignoremissing):

file_replicas = {}
for _lfn in LFNdict:
if not _lfn in bad_lfns:
if _lfn not in bad_lfns:
file_replicas[_lfn] = LFNdict[_lfn].locations

# Check if we have any bad lfns
Expand Down Expand Up @@ -376,17 +376,15 @@ def updateLFNData(bad_lfns, allLFNs, LFNdict, ignoremissing, allLFNData):
if upper_limit > len(allLFNs):
upper_limit = len(allLFNs)

#logger.debug("Updating LFN Physical Locations: [%s:%s] of %s" % (str(i * LFN_parallel_limit), str(upper_limit), str(len(allLFNs))))

for this_lfn in values.keys():
#logger.debug("LFN: %s" % str(this_lfn))
# logger.debug("LFN: %s" % str(this_lfn))
this_dict = {}
this_dict[this_lfn] = values.get(this_lfn)

if this_lfn in LFNdict:
#logger.debug("Updating RemoteURLs")
# logger.debug("Updating RemoteURLs")
LFNdict[this_lfn]._updateRemoteURLs(this_dict)
#logger.debug("This_dict: %s" % str(this_dict))
# logger.debug("This_dict: %s" % str(this_dict))
else:
logger.error("Error updating remoteURLs for: %s" % str(this_lfn))

Expand Down Expand Up @@ -467,8 +465,8 @@ def OfflineGangaDiracSplitter(_inputs, filesPerJob, maxFiles, ignoremissing, ban

logger.debug("Found all SE in use")

#logger.info("%s" % str(CE_to_SE_mapping))
#logger.info("%s" % str(SE_to_CE_mapping))
# logger.info("%s" % str(CE_to_SE_mapping))
# logger.info("%s" % str(SE_to_CE_mapping))

# BELOW IS WHERE THE ACTUAL SPLITTING IS DONE

Expand All @@ -490,13 +488,13 @@ def OfflineGangaDiracSplitter(_inputs, filesPerJob, maxFiles, ignoremissing, ban
check_count = check_count + len(i)

if check_count != len(inputs) - len(bad_lfns):
#First check if there are duplicates causing this problem
# First check if there are duplicates causing this problem
allLFNs = [_lfn.lfn for _lfn in inputs]
lfnset = set(allLFNs)
del_list = list(allLFNs)
for _l in lfnset:
del_list.remove(_l)
if len(del_list)>0:
if len(del_list) > 0:
raise SplitterError("Duplicate LFNs found, check your inputdata! %s" % del_list)

logger.error("SERIOUS SPLITTING ERROR!!!!!")
Expand Down Expand Up @@ -580,13 +578,13 @@ def performSplitting(site_dict, filesPerJob, allChosenSets, wanted_common_site,

# If subset is too small throw it away
if len(_this_subset) < limit and len(_this_subset) < max_limit:
#logger.debug("%s < %s" % (str(len(_this_subset)), str(limit)))
# logger.debug("%s < %s" % (str(len(_this_subset)), str(limit)))
allChosenSets[iterating_LFN] = generate_site_selection(
site_dict[iterating_LFN], wanted_common_site, uniqueSE, CE_to_SE_mapping, SE_to_CE_mapping)
continue
else:
#logger.info("found common LFN for: " + str(allChosenSets[iterating_LFN]))
#logger.info("%s > %s" % (str(len(_this_subset)), str(limit)))
# logger.info("found common LFN for: " + str(allChosenSets[iterating_LFN]))
# logger.info("%s > %s" % (str(len(_this_subset)), str(limit)))
# else Dataset was large enough to be considered useful
logger.debug("Generating Dataset of size: %s" % str(len(_this_subset)))
# Construct DiracFile here as we want to keep the above combination
Expand All @@ -600,8 +598,8 @@ def performSplitting(site_dict, filesPerJob, allChosenSets, wanted_common_site,
# Lets keep track of how many times we've tried this
iterations = iterations + 1

#logger.info("Iteration: %s" % iterations)
#logger.info("%s %s" % (good_fraction, bad_fraction))
# logger.info("Iteration: %s" % iterations)
# logger.info("%s %s" % (good_fraction, bad_fraction))

# Can take a while so lets not let threads become un-locked
import GangaCore.Runtime.Repository_runtime
Expand All @@ -622,7 +620,7 @@ def performSplitting(site_dict, filesPerJob, allChosenSets, wanted_common_site,
bad_fraction = 0.75
else:
good_fraction = good_fraction * 0.75
#bad_fraction = bad_fraction * 0.75
# bad_fraction = bad_fraction * 0.75

logger.debug("good_fraction: %s" % str(good_fraction))

Expand Down

0 comments on commit 9f2f737

Please sign in to comment.