import os
import sys
import glob
import copy
import time
import atexit
import pickle
import inspect
import asyncio
import datetime
import functools
from collections import OrderedDict
from cached_property import cached_property
import mosaic
try:
import _profile
except ImportError:
pass
__all__ = ['Profiler', 'GlobalProfiler', 'profiler',
'global_profiler', 'skip_profile', 'no_profiler',
'use_trace']
filter_modules = []
profiler = None
global_profiler = None
def _format_info(filename, lineno, func_name, cls_name=None):
if cls_name:
func_name = '%s.%s' % (cls_name, func_name)
info = {
'name': func_name,
'filename': filename,
'lineno': lineno,
}
return info
def _full_info(frame):
func_code = frame.f_code
func_name = func_code.co_name
full_path = func_code.co_filename
lineno = frame.f_lineno
# first, let's whether we should even go on
if full_path.startswith('<') and full_path.endswith('>'):
return _format_info(full_path, lineno, func_name)
path, filename = os.path.split(full_path)
for module in filter_modules:
if module in path:
return _format_info(full_path, lineno, func_name)
# then, let's see if it is a builtin
if func_name.startswith('<') and func_name.endswith('>'):
return _format_info(full_path, lineno, func_name)
# otherwise, it could be directly defined in the module
module = inspect.getmodule(frame)
func = getattr(module, func_name, None)
if func is not None and getattr(func, '__code__', None) is func_code:
return _format_info(full_path, lineno, func_name)
# if not, it might belong to a class at the first level of the
# module
for cls_name in dir(module):
cls = getattr(module, cls_name)
if not inspect.isclass(cls):
continue
# see if this class has a method with the name
# we're looking for
try:
method = vars(cls)[func_name]
except KeyError:
continue
# unwrap the method just in case there are any decorators
try:
method = inspect.unwrap(method)
except ValueError:
pass
# see if this is the method that called us
if getattr(method, '__code__', None) is func_code:
return _format_info(full_path, lineno, func_name, cls_name)
# if not at the top level, the class might be given as a free variable
# within the locals
if '__class__' in func_code.co_freevars:
try:
cls_name = frame.f_locals['__class__'].__name__
return _format_info(full_path, lineno, func_name, cls_name)
except KeyError:
pass
# if not a free variable, then it might be given as the first
# argument to the function
try:
self_name = func_code.co_varnames[0]
except IndexError:
return _format_info(full_path, lineno, func_name)
try:
self_type = type(frame.f_locals[self_name])
except KeyError:
return _format_info(full_path, lineno, func_name)
for cls in self_type.__mro__:
# see if this class has a method with the name
# we're looking for
try:
method = vars(cls)[func_name]
except KeyError:
continue
# assume that this is the method that called
cls_name = cls.__name__
return _format_info(full_path, lineno, func_name, cls_name)
return _format_info(full_path, lineno, func_name)
def trace(frame, event, arg):
frame.f_trace_lines = False
if not len(profiler.active_traces):
return trace
if event not in ['call', 'return', 'exception']:
return
if event == 'exception':
return trace
if event == 'call':
frame_id = profiler.f_call(frame)
if frame_id:
return trace
elif event == 'return':
profiler.f_return(frame)
def profiled_task_factory(loop, coro):
if profiler.tracing:
sys.settrace(None)
# create task
child_task = asyncio.tasks.Task(coro, loop=loop)
if not profiler.tracing:
return child_task
if not len(profiler.active_traces):
profiler.maybe_trace()
return child_task
# find the outer and inner frames,
# delete the node corresponding to the current frame
trace_id, outer_frame_id, curr_frame_id = profiler._del_node()
if outer_frame_id is None:
profiler.maybe_trace()
return child_task
if not hasattr(coro, 'cr_frame') and not hasattr(coro, 'gi_frame'):
profiler.maybe_trace()
return child_task
try:
frame = coro.cr_frame
except AttributeError:
frame = coro.gi_frame
# substitute it with a new one for the coro
profiler._new_node(trace_id, outer_frame_id, frame, timeit=False)
profiler.maybe_trace()
return child_task
def dict_diff(dict_1, dict_2):
diff_dict = OrderedDict()
for key, value in dict_2.items():
if key not in dict_1:
diff_dict[key] = value
elif isinstance(value, dict):
diff_dict[key] = dict_diff(dict_1[key], value)
return diff_dict
def dict_update(dict_1, dict_2):
for key, value in dict_2.items():
if isinstance(value, dict):
dict_1[key] = dict_update(dict_1.get(key, {}), value)
else:
dict_1[key] = value
return dict_1
[docs]
class Profiler:
"""
The profiler controls profiling for the local runtime, including starting and stopping
the profile, and starting new traces.
"""
def __init__(self):
self.nodes = OrderedDict()
self.active_nodes = dict()
self.async_nodes = set()
self.hash_nodes = dict()
self.skip_nodes = dict()
self.task_nodes = dict()
self.traces = dict()
self.active_traces = dict()
self.t_start = None
self.t_end = None
self.t_elapsed = None
self._tracing = False
self._runtime_id = None
[docs]
def maybe_trace(self):
if self.tracing and len(self.active_traces):
sys.settrace(trace)
[docs]
def maybe_stop_trace(self):
if not len(self.active_traces):
sys.settrace(None)
[docs]
def clear(self):
"""
Clear profiler.
Returns
-------
"""
self.nodes = OrderedDict()
self.active_nodes = dict()
self.async_nodes = set()
self.hash_nodes = dict()
self.skip_nodes = dict()
self.traces = dict()
self.active_traces = dict()
self.t_start = None
self.t_end = None
self.t_elapsed = None
[docs]
def start(self):
"""
Start profiling.
Returns
-------
"""
self.clear()
self.t_start = time.time()
self._tracing = True
runtime = mosaic.runtime()
self._runtime_id = runtime.uid if runtime is not None else 'head'
# loop = asyncio.get_event_loop()
# loop.set_task_factory(profiled_task_factory)
if global_profiler is not None:
global_profiler.start()
[docs]
def stop(self):
"""
Stop profiling.
Returns
-------
"""
active_traces = list(self.active_traces.keys())
for trace_id in active_traces:
self.stop_trace(trace_id)
self.t_end = time.time()
self.t_elapsed = self.t_end - self.t_start
self._tracing = False
sys.settrace(None)
if global_profiler is not None:
global_profiler.stop()
[docs]
def start_trace(self, trace_id=None, level=1):
"""
Start a new profiling trace.
Parameters
----------
trace_id : str, optional
Optional trace_id for this trace.
level : int, optional
Level at which the trace should start.
Returns
-------
"""
if not self.tracing:
return
root_frame = sys._getframe(level)
if trace_id is None:
trace_id = self._frame_id(root_frame)
else:
self.hash_nodes[trace_id] = 1
t_start = time.time()
new_node = OrderedDict(name='trace:%d' % len(self.traces),
frame_id=trace_id,
trace_id=trace_id,
t_start=t_start)
try:
outer_frame_id = root_frame.f_locals['__frame_id__']
outer_trace_id = root_frame.f_locals['__trace_id__']
root_frame.f_locals['__prev_frame_id__'] = outer_frame_id
root_frame.f_locals['__prev_trace_id__'] = outer_trace_id
self.active_nodes[outer_frame_id][trace_id] = new_node
except KeyError:
self.nodes[trace_id] = new_node
root_frame.f_locals['__frame_id__'] = trace_id
root_frame.f_locals['__trace_id__'] = trace_id
root_frame.f_trace_lines = False
self.active_nodes[trace_id] = new_node
self.traces[trace_id] = new_node
self.active_traces[trace_id] = new_node
is_async = _profile.is_async(root_frame)
if is_async:
self.async_nodes.add(trace_id)
self.maybe_trace()
return trace_id
[docs]
def stop_trace(self, trace_id=None, level=1):
"""
Stop a profiling trace.
Returns
-------
"""
_trace_id, _, _ = self._del_node(level=level)
if not self.tracing:
return
root_frame = sys._getframe(level)
try:
outer_frame_id = root_frame.f_locals['__prev_frame_id__']
outer_trace_id = root_frame.f_locals['__prev_trace_id__']
root_frame.f_locals['__frame_id__'] = outer_frame_id
root_frame.f_locals['__trace_id__'] = outer_trace_id
except KeyError:
pass
trace_id = trace_id or _trace_id
try:
node = self.active_nodes[trace_id]
del self.active_nodes[trace_id]
del self.active_traces[trace_id]
except KeyError:
return
t_end = time.time()
node['t_end'] = t_end
node['t_elapsed'] = node['t_end'] - node['t_start']
if trace_id in self.async_nodes:
self.async_nodes.remove(trace_id)
self.maybe_stop_trace()
[docs]
def frame_info(self, level=1):
frame = sys._getframe(level)
try:
trace_id = frame.f_locals['__trace_id__']
frame_id = frame.f_locals['__frame_id__']
except KeyError:
return None, None
return trace_id, frame_id
@property
def tracing(self):
"""
Whether or not the profiler is activated.
"""
return self._tracing
[docs]
def f_call(self, frame):
"""
Event fired when function is called.
Parameters
----------
frame
Returns
-------
"""
if not self.tracing:
return
try:
outer_frame_id = frame.f_back.f_locals['__frame_id__']
trace_id = frame.f_back.f_locals['__trace_id__']
except KeyError:
return
return self._new_node(trace_id, outer_frame_id, frame)
[docs]
def f_return(self, frame):
"""
Event fired when function returns.
Parameters
----------
frame
Returns
-------
"""
if not self.tracing:
return
try:
frame_id = frame.f_locals['__frame_id__']
trace_id = frame.f_locals['__trace_id__']
except KeyError:
return
is_async = _profile.is_async(frame)
is_suspended = is_async and _profile.is_suspended(frame)
if is_suspended:
return
if is_async and frame_id in self.async_nodes:
self.async_nodes.remove(frame_id)
try:
node = self.active_nodes[frame_id]
del self.active_nodes[frame_id]
except KeyError:
return
t_end = time.time()
node['t_end'] = t_end
node['t_elapsed'] = node['t_end'] - node['t_start']
def _new_node(self, trace_id, outer_frame_id, frame, timeit=True):
if outer_frame_id not in self.active_nodes:
return
# assign or retrieve a frame ID
try:
frame_id = frame.f_locals['__frame_id__']
except KeyError:
frame_id = self._frame_id(frame)
frame.f_locals['__frame_id__'] = frame_id
# assign a trace ID
frame.f_locals['__trace_id__'] = trace_id
# check for suspended async frame
is_async = _profile.is_async(frame)
if is_async:
if frame_id in self.async_nodes:
new_node = self.active_nodes[frame_id]
if new_node.get('t_start') is None:
new_node['t_start'] = time.time()
return frame_id
self.async_nodes.add(frame_id)
# create new node
frame_info = _full_info(frame)
t_start = time.time() if timeit else None
new_node = OrderedDict(**frame_info,
frame_id=frame_id,
trace_id=trace_id,
t_start=t_start)
try:
self.active_nodes[outer_frame_id][frame_id] = new_node
self.active_nodes[frame_id] = new_node
except KeyError:
pass
return frame_id
def _del_node(self, level=1):
curr_frame = sys._getframe(level)
outer_frame = sys._getframe(level+1)
try:
outer_frame_id = outer_frame.f_locals['__frame_id__']
trace_id = outer_frame.f_locals['__trace_id__']
except KeyError:
return None, None, None
try:
curr_frame_id = curr_frame.f_locals['__frame_id__']
del self.active_nodes[outer_frame_id][curr_frame_id]
del self.active_nodes[curr_frame_id]
except KeyError:
return trace_id, outer_frame_id, None
return trace_id, outer_frame_id, curr_frame_id
def _frame_id(self, frame):
frame_id = str(id(frame))
if frame_id not in self.hash_nodes:
self.hash_nodes[frame_id] = 0
count = self.hash_nodes[frame_id] = self.hash_nodes[frame_id] + 1
return '%s.%s.%d' % (self._runtime_id, frame_id, count)
profiler = Profiler()
[docs]
def skip_profile(*args, **kwargs):
"""
Skip profiling for a decorated function:
>>> @skip_profile
>>> def function_not_profiled():
>>> pass
Parameters
----------
stop_trace : bool, optional
Whether to stop the system trace, defaults to False.
Returns
-------
"""
def make_wrapper(func, stop_trace):
if not profiler.tracing:
return func
profiler._del_node()
@functools.wraps(func)
def wrapper(*_args, **_kwargs):
with no_profiler('skip_profile:%s' % func.__name__,
level=2,
stop_trace=stop_trace):
ret = func(*_args, **_kwargs)
return ret
return wrapper
if len(args) == 1 and callable(args[0]):
profiler._del_node()
return make_wrapper(args[0], False)
else:
def _skip_profile(func):
profiler._del_node()
return make_wrapper(func, kwargs.get('stop_trace', False))
return _skip_profile
[docs]
class no_profiler:
"""
Skip profiling for a code block:
>>> with no_profiler():
>>> pass
Parameters
----------
stop_trace : bool, optional
Whether to stop the system trace, defaults to False.
Returns
-------
"""
def __init__(self, name='no_profiler', level=1, stop_trace=False):
profiler._del_node(level=level)
self._name = name
self._level = level
self._stop_trace = stop_trace
self._node = None
self._outer_node = None
self._frame_id = None
def __enter__(self):
if not profiler.tracing:
return
trace_id, outer_frame_id, curr_frame_id = profiler._del_node(level=self._level)
if outer_frame_id:
if self._stop_trace:
sys.settrace(None)
t_start = time.time()
new_node = OrderedDict(name=self._name, t_start=t_start)
self._node = new_node
profiler.active_nodes[outer_frame_id][curr_frame_id] = new_node
self._frame_id = outer_frame_id
self._outer_node = profiler.active_nodes[outer_frame_id]
del profiler.active_nodes[outer_frame_id]
def __exit__(self, exc_type, exc_val, exc_tb):
if not profiler.tracing:
return
if self._frame_id is not None:
if self._stop_trace:
profiler.maybe_trace()
t_end = time.time()
self._node['t_end'] = t_end
self._node['t_elapsed'] = self._node['t_end'] - self._node['t_start']
profiler.active_nodes[self._frame_id] = self._outer_node
class use_trace:
def __init__(self, trace_id, outer_frame_id=None, level=1):
profiler._del_node(level=level)
self._trace_id = trace_id
self._outer_frame_id = outer_frame_id
self._level = level
self._prev_trace_id = None
self._prev__outer_frame_id = None
self._frame = None
def __enter__(self):
if not profiler.tracing:
return
trace_id, outer_frame_id, curr_frame_id = profiler._del_node(level=self._level)
self._frame = sys._getframe(self._level)
self._frame.f_locals['__trace_id__'] = self._trace_id
if self._outer_frame_id:
self._frame.f_locals['__frame_id__'] = self._outer_frame_id
self._prev_trace_id = trace_id
self._prev__outer_frame_id = outer_frame_id
def __exit__(self, exc_type, exc_val, exc_tb):
if not profiler.tracing:
return
profiler._del_node(level=self._level)
self._frame.f_locals['__trace_id__'] = self._prev_trace_id
if self._outer_frame_id:
self._frame.f_locals['__frame_id__'] = self._prev__outer_frame_id
class LocalProfiler:
def __init__(self):
self.profiles = OrderedDict()
self._last_profile = OrderedDict()
self._last_part = -1
self.t_start = None
self.t_end = None
self.t_elapsed = None
now = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
self.filename = '%s.profile' % now
def clear(self):
self.profiles = OrderedDict()
self._last_profile = OrderedDict()
self._last_part = -1
self.t_start = None
self.t_end = None
self.t_elapsed = None
def start(self):
self.clear()
runtime = mosaic.runtime()
self.profiles[runtime.uid] = profiler.nodes
self.t_start = time.time()
# loop = mosaic.get_event_loop()
# loop.interval(self.append, filename=self.filename, interval=30)
def stop(self):
self.t_end = time.time()
self.t_elapsed = self.t_end - self.t_start
@skip_profile(stop_trace=True)
def update(self, sender_id, profiler_update):
if sender_id not in self.profiles:
self.profiles[sender_id] = OrderedDict()
self.profiles[sender_id] = dict_update(self.profiles[sender_id], profiler_update)
@skip_profile(stop_trace=True)
def dump(self, *args, **kwargs):
description = dict()
description['profiles'] = self.profiles
description['t_start'] = self.t_start
description['t_end'] = self.t_end
description['t_elapsed'] = self.t_elapsed
filename = kwargs.pop('filename')
with open(filename, 'wb') as file:
pickle.dump(description, file, protocol=pickle.HIGHEST_PROTOCOL)
self._last_profile = copy.deepcopy(self.profiles)
@skip_profile(stop_trace=True)
def load(self, *args, **kwargs):
filename = kwargs.pop('filename')
if filename.endswith('.parts'):
path = filename
filename = filename.split('/')[-1][:-6]
parts = glob.glob(os.path.join(path, '%s.*' % filename))
parts.sort()
for part in parts:
with open(part, 'rb') as file:
update = pickle.load(file)
self.t_start = update.get('t_start')
self.t_end = update.get('t_end')
self.t_elapsed = update.get('t_elapsed')
self.profiles = dict_update(self.profiles, update['profiles'])
else:
with open(filename, 'rb') as file:
update = pickle.load(file)
self.t_start = update.get('t_start')
self.t_end = update.get('t_end')
self.t_elapsed = update.get('t_elapsed')
self.profiles = update['profiles']
@skip_profile(stop_trace=True)
def append(self, *args, **kwargs):
path = '%s.parts' % self.filename
if not os.path.exists(path):
os.makedirs(path)
self._last_part += 1
filename = os.path.join(path, '%s.%d' % (self.filename, self._last_part))
description = dict()
description['profiles'] = self.get_update()
description['t_start'] = self.t_start
description['t_end'] = self.t_end
description['t_elapsed'] = self.t_elapsed
with open(filename, 'wb') as file:
pickle.dump(description, file, protocol=pickle.HIGHEST_PROTOCOL)
@skip_profile(stop_trace=True)
def get_update(self):
current_profile = self.profiles
profile_update = dict_diff(self._last_profile, current_profile)
for runtime_id, runtime in self.profiles.items():
for trace_id, trace_node in runtime.items():
if trace_id not in profile_update[runtime_id]:
profile_update[runtime_id][trace_id] = OrderedDict()
profile_update[runtime_id][trace_id]['t_end'] = trace_node.get('t_end')
profile_update[runtime_id][trace_id]['t_elapsed'] = trace_node.get('t_elapsed')
self._last_profile = current_profile
return profile_update
class RemoteProfiler:
def __init__(self, runtime_id='monitor'):
self._runtime_id = runtime_id
self._last_profile = OrderedDict()
@cached_property
def remote_runtime(self):
runtime = mosaic.runtime()
return runtime.proxy(self._runtime_id)
def clear(self):
self._last_profile = OrderedDict()
def start(self):
self.clear()
def stop(self):
pass
@skip_profile(stop_trace=True)
def update(self):
# start = time.time()
profiler_update = self.get_update()
if not len(profiler_update.keys()):
return
self.remote_runtime.recv_profile(profiler_update=profiler_update, as_async=False)
return profiler_update
@skip_profile(stop_trace=True)
def get_update(self):
current_profile = profiler.nodes
profiler_update = dict_diff(self._last_profile, current_profile)
for trace_id, trace_node in profiler.nodes.items():
if trace_id not in profiler_update:
profiler_update[trace_id] = OrderedDict()
profiler_update[trace_id]['t_end'] = trace_node.get('t_end')
profiler_update[trace_id]['t_elapsed'] = trace_node.get('t_elapsed')
self._last_profile = current_profile
return profiler_update
[docs]
class GlobalProfiler:
"""
The global profiler keeps the different endpoints of the runtime in contact,
so that local profiles can be consolidated into a single, global one.
"""
def __init__(self):
self.profiler = None
self.mode = None
[docs]
def start(self):
if self.profiler is not None:
self.profiler.start()
[docs]
def stop(self):
if self.profiler is not None:
self.profiler.stop()
[docs]
def set_local(self):
self.profiler = LocalProfiler()
self.mode = 'local'
[docs]
def set_remote(self, runtime_id='monitor'):
self.profiler = RemoteProfiler(runtime_id=runtime_id)
self.mode = 'remote'
[docs]
def send_profile(self):
if self.mode != 'remote':
return
self.profiler.update()
[docs]
def get_profile(self):
if self.mode != 'remote':
return
return self.profiler.get_update()
[docs]
def recv_profile(self, sender_id, profiler_update):
if self.mode != 'local':
return
self.profiler.update(sender_id, profiler_update)
[docs]
def dump(self, *args, **kwargs):
if self.mode != 'local':
return
self.profiler.dump(*args, **kwargs)
[docs]
def load(self, *args, **kwargs):
if self.mode != 'local':
return
self.profiler.load(*args, **kwargs)
[docs]
def append(self, *args, **kwargs):
if self.mode != 'local':
return
self.profiler.append(*args, **kwargs)
global_profiler = GlobalProfiler()
def _stop_tracing():
sys.settrace(None)
atexit.register(_stop_tracing)