import uuid
import copy
import time
from collections import OrderedDict
from ..file_manipulation import h5
__all__ = ['MonitoredResource', 'MonitoredObject']
[docs]
class MonitoredResource:
"""
Base class for those that keep track of the state of a Mosaic runtime,
"""
def __init__(self, uid):
self.uid = self.name = uid
self.history = []
self.sub_resources = dict()
self._last_update = 0
self._last_append = 0
@property
def state(self):
try:
return self.history[-1]['state']
except IndexError:
return None
@state.setter
def state(self, value):
last_event = copy.deepcopy(self.history[-1])
last_event['state'] = value
self.update(last_event)
[docs]
def update(self, update, **kwargs):
if not isinstance(update, list):
update = [update]
self.history.extend(update)
for group_name, group_update in kwargs.items():
self.add_group(group_name)
group = self.sub_resources[group_name]
for name, resource in group_update.items():
self.add_resource(group_name, name)
group[name].update(resource)
[docs]
def get_update(self):
history = self.history[self._last_update:]
self._last_update = len(self.history)
sub_resources = OrderedDict()
for group_name, group in self.sub_resources.items():
sub_resources[group_name] = OrderedDict()
for name, resource in group.items():
sub_resources[group_name][name] = resource.get_update()[0]
return history, sub_resources
[docs]
def append(self, filename=None):
history = self.history[self._last_append:]
node_description = {}
for index, item in enumerate(history):
label = 'history_%08d' % (self._last_append + index)
node_description[label] = item
self._last_append = len(self.history)
sub_description = {}
for group_name, group in self.sub_resources.items():
group_description = dict()
for name, resource in group.items():
resource_description = resource.append()
group_description[name] = resource_description
sub_description[group_name] = group_description
description = {
'history': node_description,
'sub_resources': sub_description,
}
if filename is None:
return description
if not h5.file_exists(filename=filename):
with h5.HDF5(filename=filename, mode='w') as file:
file.dump(description)
else:
with h5.HDF5(filename=filename, mode='a') as file:
file.append(description)
[docs]
def add_group(self, group_name):
if group_name not in self.sub_resources:
self.sub_resources[group_name] = OrderedDict()
return self.sub_resources[group_name]
[docs]
def add_resource(self, group_name, name):
group = self.sub_resources[group_name]
if name not in group:
group[name] = MonitoredResource(name)
return group[name]
[docs]
def sort_resources(self, group_name, key, desc=False):
group = self.sub_resources[group_name]
self.sub_resources[group_name] = OrderedDict(sorted(group.items(),
key=lambda x: x[1].history[-1][key],
reverse=desc))
[docs]
class MonitoredObject:
"""
Base class for those that keep track of the state of a Mosaic object,
"""
def __init__(self, runtime_id, uid, tessera_id=None):
self.uid = uid
self.runtime_id = runtime_id
if tessera_id is not None:
self.tessera_id = tessera_id
self.proxy_events = OrderedDict()
self.proxy_profiles = OrderedDict()
self.remote_events = []
self.remote_profiles = []
self._last_proxy_event = dict()
self._last_proxy_profile = dict()
self._last_remote_event = 0
self._last_remote_profile = 0
@property
def state(self):
try:
return self.remote_events[-1]['name']
except IndexError:
return None
@state.setter
def state(self, value):
event = {
'name': 'collected',
'event_t': time.time(),
}
self.remote_events.append(event)
[docs]
def collect(self):
event = {
'name': 'collected',
'event_t': time.time(),
}
if self.state != 'collected':
self.remote_events.append(event)
for runtime_id, proxy in self.proxy_events.items():
if proxy[-1]['name'] != 'collected':
proxy.append(event)
[docs]
def add_event(self, runtime_id, event_type, event_name, **kwargs):
event_uid = uuid.uuid4().hex
event = dict(name=event_name, event_uid=event_uid, **kwargs)
if event_type == 'proxy':
if runtime_id not in self.proxy_events:
self.proxy_events[runtime_id] = []
self.proxy_events[runtime_id].append(event)
elif event_type == 'remote':
self.remote_events.append(event)
[docs]
def add_profile(self, runtime_id, profile_type, profile):
if profile_type == 'proxy':
if runtime_id not in self.proxy_profiles:
self.proxy_profiles[runtime_id] = []
self.proxy_profiles[runtime_id].append(profile)
elif profile_type == 'remote':
self.remote_profiles.append(profile)
[docs]
def append(self, filename=None):
remote_events = self.remote_events[self._last_remote_event:]
remote_profiles = self.remote_profiles[self._last_remote_profile:]
remote_events_description = {}
for index, item in enumerate(remote_events):
label = 'history_%08d' % (self._last_remote_event + index)
remote_events_description[label] = item
remote_profiles_description = {}
for index, item in enumerate(remote_profiles):
label = 'history_%08d' % (self._last_remote_profile + index)
remote_profiles_description[label] = item
self._last_remote_event = len(self.remote_events)
self._last_remote_profile = len(self.remote_profiles)
proxy_events_description = {}
for proxy_name, proxy in self.proxy_events.items():
try:
last_event = self._last_proxy_event[proxy_name]
except KeyError:
last_event = 0
proxy_events = proxy[last_event:]
proxy_description = {}
for index, item in enumerate(proxy_events):
label = 'history_%08d' % (last_event + index)
proxy_description[label] = item
proxy_events_description[proxy_name] = proxy_description
self._last_proxy_event[proxy_name] = len(proxy)
proxy_profiles_description = {}
for proxy_name, proxy in self.proxy_profiles.items():
try:
last_profile = self._last_proxy_profile[proxy_name]
except KeyError:
last_profile = 0
proxy_profiles = proxy[last_profile:]
proxy_description = {}
for index, item in enumerate(proxy_profiles):
label = 'history_%08d' % (last_profile + index)
proxy_description[label] = item
proxy_profiles_description[proxy_name] = proxy_description
self._last_proxy_profile[proxy_name] = len(proxy)
description = {
'uid': self.uid,
'runtime_id': self.runtime_id,
'remote_events': remote_events_description,
'remote_profiles': remote_profiles_description,
'proxy_events': proxy_events_description,
'proxy_profiles': proxy_profiles_description,
}
if hasattr(self, 'tessera_id'):
description['tessera_id'] = self.tessera_id
if filename is None:
return description
if not h5.file_exists(filename=filename):
with h5.HDF5(filename=filename, mode='w') as file:
file.dump(description)
else:
with h5.HDF5(filename=filename, mode='a') as file:
file.append(description)