import psutil
import mosaic
from .runtime import Runtime, RuntimeProxy
from .monitor import MonitoredResource
from ..utils import LoggerManager
from ..utils import subprocess
from ..utils.utils import memory_limit, cpu_count
from ..profile import profiler, global_profiler
__all__ = ['Node']
[docs]
class Node(Runtime):
"""
A node represents a physically independent portion of the network,
such as a separate cluster node. Nodes contain one or more
workers, which they initialise and manage.
"""
is_node = True
def __init__(self, **kwargs):
super().__init__(**kwargs)
num_workers = kwargs.pop('num_workers', None)
num_workers = num_workers or 1
self._own_workers = dict()
self._num_workers = num_workers
self._num_threads = None
self._memory_limit = memory_limit()
self._monitored_node = MonitoredResource(self.uid)
self._monitor_interval = None
self._update_interval = None
[docs]
async def init(self, **kwargs):
"""
Asynchronous counterpart of ``__init__``.
Parameters
----------
kwargs
Returns
-------
"""
await super().init(**kwargs)
# Start local cluster
await self.init_warehouse(indices=self.indices[0], **kwargs)
await self.init_workers(**kwargs)
[docs]
async def init_workers(self, **kwargs):
"""
Init workers in the node.
Parameters
----------
kwargs
Returns
-------
"""
if self.mode == 'cluster':
pass
num_cpus = cpu_count()
num_workers = self._num_workers
num_threads = kwargs.pop('num_threads', None)
num_threads = num_threads or num_cpus // num_workers
self._num_threads = num_threads
if num_workers*num_threads > num_cpus:
raise ValueError('Requested number of CPUs per node (%d - num_workers*num_threads) '
'is greater than the number of available CPUs (%d)' % (num_workers*num_threads, num_cpus))
# Find all available NUMA nodes and CPUs per node
try:
import numa
numa_available = numa.info.numa_available()
except Exception:
numa_available = False
if numa_available:
available_cpus = numa.info.numa_hardware_info()['node_cpu_info']
allowed_cpus = numa.schedule.get_allowed_cpus_num()
else:
available_cpus = {worker_index: list(range(num_threads*worker_index,
num_threads*(worker_index+1)))
for worker_index in range(self._num_workers)}
allowed_cpus = sum([len(c) for c in available_cpus.values()])
# Eliminate cores corresponding to hyperthreading
for node_index, node_cpus in available_cpus.items():
node_cpus = [each for each in node_cpus if each < num_cpus]
available_cpus[node_index] = node_cpus
total_cpus = sum([len(c) for c in available_cpus.values()])
worker_cpus = {}
worker_nodes = {}
if total_cpus <= allowed_cpus:
node_ids = list(available_cpus.keys())
num_nodes = len(available_cpus)
num_cpus_per_node = min([len(cpus) for cpus in available_cpus.values()])
# Distribute cores across workers
worker_cpus = {}
worker_nodes = {}
if num_nodes >= self._num_workers:
nodes_per_worker = num_nodes // self._num_workers
for worker_index in range(self._num_workers):
node_s = worker_index*nodes_per_worker
node_e = min((worker_index+1)*nodes_per_worker, num_nodes)
worker_cpus[worker_index] = sum([available_cpus[node_index]
for node_index in node_ids[node_s:node_e]], [])
worker_nodes[worker_index] = node_ids[node_s:node_e]
else:
workers_per_node = self._num_workers // num_nodes
cpus_per_worker = num_cpus_per_node // workers_per_node
for node_index, node_cpus in available_cpus.items():
worker_s = node_index*workers_per_node
worker_e = min((node_index+1)*workers_per_node, self._num_workers)
worker_chunk = {}
for worker_index in range(worker_s, worker_e):
cpu_s = worker_index*cpus_per_worker
cpu_e = min((worker_index+1)*cpus_per_worker, len(node_cpus))
worker_chunk[worker_index] = node_cpus[cpu_s:cpu_e]
worker_nodes[worker_index] = [node_index]
worker_cpus.update(worker_chunk)
# Initialise workers
for worker_index in range(self._num_workers):
indices = self.indices + (worker_index,)
def start_worker(*args, **extra_kwargs):
kwargs.update(extra_kwargs)
kwargs['runtime_indices'] = indices
kwargs['num_workers'] = num_workers
kwargs['num_threads'] = num_threads
mosaic.init('worker', *args, **kwargs, wait=True)
worker_proxy = RuntimeProxy(name='worker', indices=indices)
worker_subprocess = subprocess(start_worker)(name=worker_proxy.uid,
daemon=False,
cpu_affinity=worker_cpus.get(worker_index, None))
worker_subprocess.start_process()
worker_proxy.subprocess = worker_subprocess
self._workers[worker_proxy.uid] = worker_proxy
self._own_workers[worker_proxy.uid] = worker_proxy
await self._comms.wait_for(worker_proxy.uid)
self.resource_monitor()
await self.update_monitored_node()
[docs]
def set_logger(self):
"""
Set up logging.
Returns
-------
"""
self.logger = LoggerManager()
if self.mode == 'local':
self.logger.set_local(format=self.mode)
else:
runtime_id = 'head' if self.mode == 'interactive' else 'monitor'
self.logger.set_remote(runtime_id=runtime_id, format=self.mode)
[docs]
def set_profiler(self):
"""
Set up profiling.
Returns
-------
"""
global_profiler.set_remote('monitor')
super().set_profiler()
[docs]
def resource_monitor(self):
"""
Monitor reseources available for workers, and worker state.
Returns
-------
"""
try:
import GPUtil
gpus = GPUtil.getGPUs()
except (ImportError, ValueError):
gpus = []
cpu_load = 0.
memory_fraction = 0.
self._monitored_node.add_group('gpus')
self._monitored_node.add_group('workers')
for gpu in gpus:
gpu_id = str(gpu.id)
resource = self._monitored_node.add_resource('gpus', gpu_id)
resource.update(dict(gpu_load=gpu.load,
memory_limit=gpu.memoryTotal*1024**2,
memory_fraction=gpu.memoryUtil))
for worker_id, worker in self._own_workers.items():
resource = self._monitored_node.add_resource('workers', worker_id)
worker_cpu_load = worker.subprocess.cpu_load()
worker_memory_fraction = worker.subprocess.memory() / self._memory_limit
cpu_load += worker_cpu_load
memory_fraction += worker_memory_fraction
resource.update(dict(state=worker.subprocess.state,
cpu_load=worker_cpu_load,
memory_fraction=worker_memory_fraction))
self._monitored_node.sort_resources('workers', 'memory_fraction', desc=True)
sub_resources = self._monitored_node.sub_resources['workers']
if memory_fraction > 0.95:
self._monitored_node.sort_resources('workers', 'memory_fraction', desc=True)
for worker_id, worker in sub_resources.items():
if self._own_workers[worker_id].subprocess.paused():
continue
self._own_workers[worker_id].subprocess.pause_process()
sub_resources[worker_id].state = self._own_workers[worker_id].subprocess.state
break
else:
self._monitored_node.sort_resources('workers', 'memory_fraction', desc=False)
for worker_id, worker in sub_resources.items():
if self._own_workers[worker_id].subprocess.running():
continue
self._own_workers[worker_id].subprocess.start_process()
sub_resources[worker_id].state = self._own_workers[worker_id].subprocess.state
break
# TODO Dynamic constraints and shared resources
self._monitored_node.update(dict(num_cpus=psutil.cpu_count(),
num_gpus=len(gpus),
num_workers=self._num_workers,
num_threads=self._num_threads,
cpu_load=cpu_load,
memory_limit=self._memory_limit,
memory_fraction=memory_fraction))
[docs]
async def heart(self, sender_id=None):
if self._monitor_interval is None:
self._monitor_interval = self._loop.interval(self.resource_monitor, interval=1)
self._update_interval = self._loop.interval(self.update_monitored_node, interval=10)
[docs]
async def stop(self, sender_id=None):
"""
Stop runtime.
Parameters
----------
sender_id : str
Returns
-------
"""
if profiler.tracing:
profiler.stop()
# Close warehouse
await self._local_warehouse.stop()
self._local_warehouse.subprocess.join_process()
# Close workers
for worker_id, worker in self._own_workers.items():
await worker.stop()
worker.subprocess.join_process()
await super().stop(sender_id)
[docs]
async def update_monitored_node(self):
"""
Send status update to monitor.
Returns
-------
"""
history, sub_resources = self._monitored_node.get_update()
await self._comms.send_async('monitor',
method='update_node',
update=history,
sub_resources=sub_resources)