__version__ = '1.2'
import os
import asyncio
from .core import tessera
from .runtime import Head, Monitor, Node, Worker, Warehouse
from .utils.subprocess import subprocess
from .utils import logger as mlogger
from .utils import gather, default_logger
from .file_manipulation import h5
from .profile import profiler
_runtime = None
_runtime_types = {
'head': Head,
'monitor': Monitor,
'node': Node,
'worker': Worker,
'warehouse': Warehouse,
}
[docs]
def init(runtime_type='head', runtime_indices=(),
address=None, port=None,
parent_id=None, parent_address=None, parent_port=None,
monitor_address=None, monitor_port=None, pubsub_port=None,
num_workers=1, num_threads=None,
mode='local', reuse_head=False, monitor_strategy='round-robin',
log_level='perf', profile=False, node_list=None,
asyncio_loop=None, dump_init=False, wait=False,
**kwargs):
"""
Starts the global mosaic runtime.
Parameters
----------
runtime_type : str, optional
Type of runtime to instantiate, defaults to ``head``.
runtime_indices : tuple, optional
Indices associated with the runtime, defaults to None.
address : str, optional
Address to use for the runtime, defaults to None. If None, the comms will
try to guess the address.
port : int, optional
Port to use for the runtime, defaults to None. If None, the comms will
test ports until one becomes available.
parent_id : str, optional
UID of the parent runtime, if any.
parent_address : str, optional
Address of the parent runtime, if any.
parent_port : int, optional
Port of the parent runtime, if any.
monitor_address : str, optional
Address of the monitor to connect to.
monitor_port : int, optional
Port of the monitor to connect to.
pubsub_port : int, optional
Publishing port of the monitor to connect to.
num_workers : int, optional
Number of workers to instantiate in each node, defaults to 1.
num_threads : int, optional
Number of threads to assign to each worker, defaults to the number of
available cores over ``num_workers``.
mode : str, optional
Mode of the runtime, defaults to ``local``.
reuse_head : bool, optional
Whether to set up workers in the head node, defaults to False.
monitor_strategy : str, optional
Strategy used by the monitor to allocate tessera, defaults to round robin.
log_level : str, optional
Log level, defaults to ``perf``.
profile : bool, optional
Whether to start the profiler, defaults to False.
node_list : list, optional
List of available node addresses to connect to.
asyncio_loop: object, optional
Async loop to use in our mosaic event loop, defaults to new loop.
dump_init : bool, optional
Whether to dump initialisation file.
wait : bool, optional
Whether or not to return control to calling frame, defaults to False.
kwargs : optional
Extra keyword arguments.
Returns
-------
"""
global _runtime
if _runtime is not None:
return _runtime
mlogger.log_level = log_level
runtime_config = {
'runtime_indices': runtime_indices,
'mode': mode,
'reuse_head': reuse_head,
'monitor_strategy': monitor_strategy,
'num_workers': num_workers,
'num_threads': num_threads,
'log_level': log_level,
'profile': profile,
'node_list': node_list,
'dump_init': dump_init,
}
if address is not None and port is not None:
runtime_config['address'] = address
runtime_config['port'] = port
if parent_id is not None and parent_address is not None and parent_port is not None:
runtime_config['parent_id'] = parent_id
runtime_config['parent_address'] = parent_address
runtime_config['parent_port'] = parent_port
if monitor_address is not None and monitor_port is not None and pubsub_port is not None:
runtime_config['monitor_address'] = monitor_address
runtime_config['monitor_port'] = monitor_port
runtime_config['pubsub_port'] = pubsub_port
# Create global runtime
try:
_runtime = _runtime_types[runtime_type](**runtime_config)
except KeyError:
raise KeyError('Endpoint type is not recognised, available types are head, '
'monitor, node and worker')
loop = _runtime.get_event_loop(asyncio_loop=asyncio_loop)
result = loop.run(_runtime.init, **runtime_config)
if profile:
profiler.start()
if wait is True:
try:
loop.run_forever()
finally:
loop.stop()
return result
def __getattr__(key):
global _runtime
try:
return getattr(_runtime, key)
except AttributeError:
raise AttributeError('module mosaic has no attribute %s' % key)
def clear_runtime():
"""
Clear the global runtime.
Returns
-------
"""
global _runtime
if _runtime is not None:
mlogger.clear_logger()
del _runtime
_runtime = None
def runtime():
"""
Access the global runtime.
Returns
-------
"""
global _runtime
return _runtime
def logger():
"""
Access the runtime logger.
Returns
-------
"""
global _runtime
if _runtime is not None:
return _runtime.logger
else:
return default_logger
def stop():
"""
Stop the global runtime.
Returns
-------
"""
global _runtime
loop = _runtime.get_event_loop()
try:
loop.run(_runtime.stop)
finally:
loop.stop()
clear_runtime()
[docs]
def run(main, *args, **kwargs):
"""
Initialise the runtime and then run the ``main`` in it.
Parameters
----------
main : callable
Entry point for mosaic.
args : tuple, optional
Arguments to `mosaic.init`.
kwargs : optional
Keyword arguments to `mosaic.init`.
Returns
-------
"""
global _runtime
monitor_address = kwargs.get('monitor_address', None)
if monitor_address is None:
path = os.path.join(os.getcwd(), 'mosaic-workspace')
if not os.path.exists(path):
os.makedirs(path)
filename = os.path.join(path, 'monitor.key')
if os.path.exists(filename):
with open(filename, 'r') as file:
file.readline()
_ = file.readline().split('=')[1].strip()
parent_address = file.readline().split('=')[1].strip()
parent_port = file.readline().split('=')[1].strip()
pubsub_port = file.readline().split('=')[1].strip()
kwargs['monitor_address'] = parent_address
kwargs['monitor_port'] = int(parent_port)
kwargs['pubsub_port'] = int(pubsub_port)
try:
arg_start = file.readline().strip()
except EOFError:
pass
else:
if arg_start == '[ARGS]':
for line in file:
key, value = line.strip().split('=')
kwargs[key] = kwargs.get(key, eval(value))
init(*args, **kwargs)
loop = _runtime.get_event_loop()
async def _main():
await main(_runtime)
try:
loop.run(_main)
finally:
stop()
async def interactive(switch, *args, **kwargs):
"""
Initialise the runtime interactively.
Parameters
----------
switch : str
Whether to switch interactive mode ``on`` or ``off``.
args : tuple, optional
Arguments to `mosaic.init`.
kwargs : optional
Keyword arguments to `mosaic.init`.
Returns
-------
"""
global _runtime
if switch == 'on':
if _runtime is not None:
return
fut = init(*args, **kwargs,
mode='interactive',
asyncio_loop=asyncio.get_event_loop())
await fut
else:
if _runtime is None:
return
loop = _runtime.get_event_loop()
try:
await loop.run(_runtime.stop)
finally:
clear_runtime()