Skip to content

Commit

Permalink
Assign ts.get_nbytes() to a variable
Browse files Browse the repository at this point in the history
To avoid calling this method repeatedly in a few cases, assign the
result to a variable and reuse it.
  • Loading branch information
jakirkham committed Feb 19, 2021
1 parent 0582557 commit 8b5b3de
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2168,10 +2168,11 @@ def transition_memory_released(self, key, safe: bint = False):
dts._waiting_on.add(ts)

# XXX factor this out?
ts_nbytes: Py_ssize_t = ts.get_nbytes()
for ws in ts._who_has:
ws._has_what.remove(ts)
ws._nbytes -= ts.get_nbytes()
ts._group._nbytes_in_memory -= ts.get_nbytes()
ws._nbytes -= ts_nbytes
ts._group._nbytes_in_memory -= ts_nbytes
worker_msgs[ws._address] = [
{
"op": "delete-data",
Expand Down Expand Up @@ -4053,9 +4054,10 @@ def stimulus_missing_data(

if cts is not None and cts._state == "memory": # couldn't find this
ws: WorkerState
cts_nbytes: Py_ssize_t = cts.get_nbytes()
for ws in cts._who_has: # TODO: this behavior is extreme
ws._has_what.remove(cts)
ws._nbytes -= cts.get_nbytes()
ws._nbytes -= cts_nbytes
cts._who_has.clear()
recommendations[cause] = "released"

Expand Down Expand Up @@ -4865,12 +4867,13 @@ async def gather(self, comm=None, keys=None, serializers=None):
)
if not workers or ts is None:
continue
ts_nbytes: Py_ssize_t = ts.get_nbytes()
for worker in workers:
ws = parent._workers_dv.get(worker)
if ws is not None and ts in ws._has_what:
ws._has_what.remove(ts)
ts._who_has.remove(ws)
ws._nbytes -= ts.get_nbytes()
ws._nbytes -= ts_nbytes
self.transitions({key: "released"})

self.log_event("all", {"action": "gather", "count": len(keys)})
Expand Down Expand Up @@ -5592,12 +5595,15 @@ def update_data(
if ts is None:
ts: TaskState = self.new_task(key, None, "memory")
ts.state = "memory"
if key in nbytes:
ts.set_nbytes(nbytes[key])
ts_nbytes: Py_ssize_t = nbytes.get(key, -1)
if ts_nbytes >= 0:
ts.set_nbytes(ts_nbytes)
else:
ts_nbytes = ts.get_nbytes()
for w in workers:
ws: WorkerState = parent._workers_dv[w]
if ts not in ws._has_what:
ws._nbytes += ts.get_nbytes()
ws._nbytes += ts_nbytes
ws._has_what.add(ts)
ts._who_has.add(ws)
self.report(
Expand Down Expand Up @@ -6717,13 +6723,14 @@ def _propagate_forgotten(
ts._dependencies.clear()
ts._waiting_on.clear()

ts_nbytes: Py_ssize_t = ts.get_nbytes()
if ts._who_has:
ts._group._nbytes_in_memory -= ts.get_nbytes()
ts._group._nbytes_in_memory -= ts_nbytes

ws: WorkerState
for ws in ts._who_has:
ws._has_what.remove(ts)
ws._nbytes -= ts.get_nbytes()
ws._nbytes -= ts_nbytes
w: str = ws._address
if w in state._workers_dv: # in case worker has died
worker_msgs[w] = [{"op": "delete-data", "keys": [key], "report": False}]
Expand Down

0 comments on commit 8b5b3de

Please sign in to comment.