Skip to content

Commit

Permalink
Reformat basics.py and add check for the network size in set_topology.
Browse files Browse the repository at this point in the history
  • Loading branch information
BichengYing committed Mar 28, 2021
1 parent 6741599 commit f2eef01
Showing 1 changed file with 117 additions and 64 deletions.
181 changes: 117 additions & 64 deletions bluefog/common/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import bluefog.common.topology_util as topology_util


logger = logging.getLogger('bluefog')
logger = logging.getLogger("bluefog")
logger.setLevel(logging.INFO)

ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)-15s %(levelname)s %(message)s')
formatter = logging.Formatter("%(asctime)-15s %(levelname)s %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)

Expand All @@ -46,8 +46,11 @@ def __init__(self, pkg_path, *args):
self._is_machine_topo_weighted = False
self.warn_timeline = False

def init(self, topology_fn: Optional[Callable[[int], networkx.DiGraph]] = None,
is_weighted: bool = False):
def init(
self,
topology_fn: Optional[Callable[[int], networkx.DiGraph]] = None,
is_weighted: bool = False,
):
"""A function that initializes BlueFog.
Args:
Expand Down Expand Up @@ -128,7 +131,7 @@ def machine_rank(self) -> int:
# TODO(hhb) This only supports the homogenous environment now. Currently it assumes all
# machines share the same local_size()
assert self.is_homogeneous(), "Only supports homogeneous environment now"
return self.rank()//self.local_size()
return self.rank() // self.local_size()

def machine_size(self) -> int:
"""A function that returns the BlueFog size of the machine.
Expand All @@ -139,7 +142,7 @@ def machine_size(self) -> int:
# TODO(hhb) This only supports the homogenous environment now. Currently it assumes all
# machines share the same local_size()
assert self.is_homogeneous(), "Only supports homogeneous environment now"
return self.size()//self.local_size()
return self.size() // self.local_size()

def unified_mpi_window_model_supported(self) -> bool:
"""Returns a boolean value to indicate the MPI_Win model is unified or not.
Expand All @@ -162,8 +165,7 @@ def mpi_threads_supported(self) -> bool:
"""
mpi_threads_supported = self._MPI_LIB_CTYPES.bluefog_mpi_threads_supported()
if mpi_threads_supported == -1:
raise ValueError(
"BlueFog has not been initialized; use bf.init().")
raise ValueError("BlueFog has not been initialized; use bf.init().")
return mpi_threads_supported

def is_topo_weighted(self) -> bool:
Expand Down Expand Up @@ -208,9 +210,11 @@ def in_neighbor_machine_ranks(self) -> List[int]:
if self._machine_topology is None:
return []
_machine_rank = self.machine_rank()
in_neighbor_machine_ranks = [r for r in
self._machine_topology.predecessors(self.machine_rank())
if r != _machine_rank]
in_neighbor_machine_ranks = [
r
for r in self._machine_topology.predecessors(self.machine_rank())
if r != _machine_rank
]
return in_neighbor_machine_ranks

def in_neighbor_ranks(self) -> List[int]:
Expand All @@ -223,8 +227,9 @@ def in_neighbor_ranks(self) -> List[int]:
if self._topology is None:
return []
_rank = self.rank()
in_neighbor_ranks = [r for r in self._topology.predecessors(self.rank())
if r != _rank]
in_neighbor_ranks = [
r for r in self._topology.predecessors(self.rank()) if r != _rank
]
return in_neighbor_ranks

def out_neighbor_machine_ranks(self) -> List[int]:
Expand All @@ -237,9 +242,11 @@ def out_neighbor_machine_ranks(self) -> List[int]:
if self._machine_topology is None:
return []
_machine_rank = self.machine_rank()
out_neighbor_machine_ranks = [r for r in
self._machine_topology.successors(self.machine_rank())
if r != _machine_rank]
out_neighbor_machine_ranks = [
r
for r in self._machine_topology.successors(self.machine_rank())
if r != _machine_rank
]
return out_neighbor_machine_ranks

def out_neighbor_ranks(self) -> List[int]:
Expand All @@ -252,12 +259,14 @@ def out_neighbor_ranks(self) -> List[int]:
if self._topology is None:
return []
_rank = self.rank()
out_neighbor_ranks = [r for r in self._topology.successors(self.rank())
if r != _rank]
out_neighbor_ranks = [
r for r in self._topology.successors(self.rank()) if r != _rank
]
return out_neighbor_ranks

def set_machine_topology(self, topology: Optional[networkx.DiGraph],
is_weighted: bool = False) -> bool:
def set_machine_topology(
self, topology: Optional[networkx.DiGraph], is_weighted: bool = False
) -> bool:
"""A function that sets the virtual machine topology.
Args:
Expand All @@ -273,27 +282,35 @@ def set_machine_topology(self, topology: Optional[networkx.DiGraph],
>>> import bluefog.torch as bf
>>> from bluefog.common import topology_util
>>> bf.init()
>>> bf.set_machine_topology(topology_util.RingGraph(bf.size()))
>>> bf.set_machine_topology(topology_util.RingGraph(bf.machine_size()))
"""
if topology is None:
raise ValueError("Machine topology shall not be None.")

if not isinstance(topology, networkx.DiGraph):
raise TypeError("Machine topology must be a networkx.DiGraph obejct.")
if topology.number_of_nodes != self.machine_size():
raise TypeError(
"topology must be a networkx.DiGraph obejct with same number of nodes "
"as bf.machine_size()."
)

assert self.is_homogeneous(), "Only supports homogeneous environment now"

if topology_util.IsTopologyEquivalent(topology, self._machine_topology):
if self.local_rank() == 0:
logger.debug("Machine topology to set is the same as old one. Skip the setting.")
logger.debug(
"Machine topology to set is the same as old one. Skip the setting."
)
return True

self._machine_topology = topology
self._is_machine_topo_weighted = is_weighted
return True

def set_topology(self, topology: Optional[networkx.DiGraph] = None,
is_weighted: bool = False) -> bool:
def set_topology(
self, topology: Optional[networkx.DiGraph] = None, is_weighted: bool = False
) -> bool:
"""A function that sets the virtual topology MPI used.
Args:
Expand All @@ -317,49 +334,73 @@ def set_topology(self, topology: Optional[networkx.DiGraph] = None,
topology = topology_util.ExponentialGraph(size=self.size())
if self.local_rank() == 0:
logger.info(
"Topology is not specified. Default Exponential Two topology is used.")
"Topology is not specified. Default Exponential Two topology is used."
)

if not isinstance(topology, networkx.DiGraph):
raise TypeError("topology must be a networkx.DiGraph obejct.")
if topology.number_of_nodes != self.size():
raise TypeError(
"topology must be a networkx.DiGraph obejct with same number of nodes as bf.size()."
)

if topology_util.IsTopologyEquivalent(topology, self._topology):
if self.local_rank() == 0:
logger.debug(
"Topology to set is the same as old one. Skip the setting.")
"Topology to set is the same as old one. Skip the setting."
)
return True

# We remove the self-rank for any cases because MPI graph_comm do not include it.
destinations = sorted([r for r in topology.successors(self.rank())
if r != self.rank()])
sources = sorted([r for r in topology.predecessors(self.rank())
if r != self.rank()])
destinations = sorted(
[r for r in topology.successors(self.rank()) if r != self.rank()]
)
sources = sorted(
[r for r in topology.predecessors(self.rank()) if r != self.rank()]
)
indegree = len(sources)
outdegree = len(destinations)
sources_type = ctypes.c_int * indegree
destinations_type = ctypes.c_int * outdegree

if not is_weighted:
self._MPI_LIB_CTYPES.bluefog_set_topology.argtypes = (
[ctypes.c_int, ctypes.POINTER(ctypes.c_int),
ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
)
self._MPI_LIB_CTYPES.bluefog_set_topology.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
]
ret = self._MPI_LIB_CTYPES.bluefog_set_topology(
indegree, sources_type(*sources),
outdegree, destinations_type(*destinations))
indegree,
sources_type(*sources),
outdegree,
destinations_type(*destinations),
)
else:
# Here the source_weights is a vector containing weights from source, i.e.,
# (in-)neighbors, converted from the neighbor_weights dictionary.
self_weight, neighbor_weights = topology_util.GetRecvWeights(topology, self.rank())
source_weights = [neighbor_weights[r] for r in sorted(neighbor_weights.keys())]
source_weights_type = ctypes.c_float * indegree
self._MPI_LIB_CTYPES.bluefog_set_topology_with_weights.argtypes = (
[ctypes.c_int, ctypes.POINTER(ctypes.c_int),
ctypes.c_int, ctypes.POINTER(ctypes.c_int),
ctypes.c_float, ctypes.POINTER(ctypes.c_float)]
self_weight, neighbor_weights = topology_util.GetRecvWeights(
topology, self.rank()
)
source_weights = [
neighbor_weights[r] for r in sorted(neighbor_weights.keys())
]
source_weights_type = ctypes.c_float * indegree
self._MPI_LIB_CTYPES.bluefog_set_topology_with_weights.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.c_float,
ctypes.POINTER(ctypes.c_float),
]
ret = self._MPI_LIB_CTYPES.bluefog_set_topology_with_weights(
indegree, sources_type(*sources),
outdegree, destinations_type(*destinations),
self_weight, source_weights_type(*source_weights)
indegree,
sources_type(*sources),
outdegree,
destinations_type(*destinations),
self_weight,
source_weights_type(*source_weights),
)
if ret != 1:
if self.local_rank() == 0:
Expand Down Expand Up @@ -437,16 +478,21 @@ def timeline_start_activity(self, tensor_name: str, activity_name: str) -> bool:
if self.warn_timeline:
# We know timeline didn't turn on. No need to repeat it.
return False
self._MPI_LIB_CTYPES.bluefog_timeline.argtypes = (
[ctypes.c_bool, ctypes.c_char_p, ctypes.c_char_p]
)
self._MPI_LIB_CTYPES.bluefog_timeline.argtypes = [
ctypes.c_bool,
ctypes.c_char_p,
ctypes.c_char_p,
]
ret = self._MPI_LIB_CTYPES.bluefog_timeline(
True, tensor_name.encode("utf-8"), activity_name.encode('utf-8'))
True, tensor_name.encode("utf-8"), activity_name.encode("utf-8")
)
if ret != 1 and not self.warn_timeline:
logger.error("Cannot start activity in the timeline. "
"Most common reason is you didn't turn on the timeline function. "
"Use bfrun --timeline-filename file_name ... or "
"setting the ENV variable BLUEFOG_TIMELINE = file_name")
logger.error(
"Cannot start activity in the timeline. "
"Most common reason is you didn't turn on the timeline function. "
"Use bfrun --timeline-filename file_name ... or "
"setting the ENV variable BLUEFOG_TIMELINE = file_name"
)
self.warn_timeline = True
return False
return True
Expand All @@ -459,16 +505,21 @@ def timeline_end_activity(self, tensor_name: str) -> bool:
if self.warn_timeline:
# We know timeline didn't turn on. No need to repeat it.
return False
self._MPI_LIB_CTYPES.bluefog_timeline.argtypes = (
[ctypes.c_bool, ctypes.c_char_p, ctypes.c_char_p]
)
self._MPI_LIB_CTYPES.bluefog_timeline.argtypes = [
ctypes.c_bool,
ctypes.c_char_p,
ctypes.c_char_p,
]
ret = self._MPI_LIB_CTYPES.bluefog_timeline(
False, tensor_name.encode("utf-8"), "".encode('utf-8'))
False, tensor_name.encode("utf-8"), "".encode("utf-8")
)
if ret != 1 and not self.warn_timeline:
logger.error("Cannot end activity in the timeline. Check "
"Most common reason is you didn't turn on the timeline function. "
"Use bfrun --timeline-filename file_name ... or "
"setting the ENV variable BLUEFOG_TIMELINE = file_name")
logger.error(
"Cannot end activity in the timeline. Check "
"Most common reason is you didn't turn on the timeline function. "
"Use bfrun --timeline-filename file_name ... or "
"setting the ENV variable BLUEFOG_TIMELINE = file_name"
)
self.warn_timeline = True
return False
return True
Expand Down Expand Up @@ -501,7 +552,8 @@ def suspend(self):
"""
if not util.is_running_from_ipython:
raise EnvironmentError(
"This function should be used only when you are under ipython environment.")
"This function should be used only when you are under ipython environment."
)
self._MPI_LIB_CTYPES.bluefog_suspend()

def resume(self):
Expand All @@ -511,5 +563,6 @@ def resume(self):
"""
if not util.is_running_from_ipython:
raise EnvironmentError(
"This function should be used only when you are under ipython environment.")
"This function should be used only when you are under ipython environment."
)
self._MPI_LIB_CTYPES.bluefog_resume()

0 comments on commit f2eef01

Please sign in to comment.