Source code for stride.problem.acquisitions


import functools
import numpy as np
from collections import OrderedDict
from cached_property import cached_property

from mosaic.file_manipulation import h5

from .data import Traces
from .base import ProblemBase
from .. import plotting


__all__ = ['Shot', 'Sequence', 'Acquisitions']


def _select_slice(selection, init_ids,
                  start=None, end=None, num=None, every=1, randomly=False):

    if not len(selection):
        sel_slice = slice(start or 0, end)
        item_ids = init_ids
        item_ids.sort()
        item_ids = item_ids[sel_slice]

        if randomly is True:
            selection = np.random.permutation(item_ids).tolist()

        else:
            num_groups = int(np.ceil(len(item_ids) / num))

            selection = []

            for group_index in range(num_groups):

                group = []
                num_remaining = len(item_ids)

                start_index = group_index if every > 1 else 0
                for index in range(start_index, num_remaining, every):
                    group.append(item_ids[index])

                    if len(group) == num or not len(item_ids):
                        break

                selection += group
                item_ids = item_ids if every > 1 else list(set(item_ids) - set(group))

    next_slice = selection[:num]
    selection = selection[num:]

    next_slice.sort()

    return next_slice, selection


[docs] class Shot(ProblemBase): """ A Shot is an even in which one or more transducers act as sources with a given wavelet and one or more transducers act as receivers and record some observed data. Therefore a shot object maintains data about the ids of the transducer locations that will act as sources, the ids of the transducer locations that will act as receivers, as well as the wavelets that will be fired and the observed data that is recorded. Parameters ---------- id : int Identifier assigned to this shot. name : str Optional name for the shot. problem : Problem Problem to which the Shot belongs. geometry : Geometry Geometry referenced by the source/receiver transducer locations of the shot. sources : list Sources with which to initialise the shot, defaults to empty. receivers : list Receivers with which to initialise the shot, defaults to empty. delays : ndarray, optional Array of delays (in seconds) for each source in the shot. grid : Grid or any of Space or Time Grid on which the Acquisitions is defined """ def __init__(self, id, name=None, problem=None, **kwargs): name = name or 'shot_%05d' % id super().__init__(name=name, problem=problem, **kwargs) if id < 0: raise ValueError('The shot needs a positive ID') self.id = id self.frame = -1 self.acq = -1 self.sequence_id = None if problem is not None: geometry = problem.geometry else: geometry = kwargs.pop('geometry', None) self._geometry = geometry self._acquisitions = None self._sequence = None self._sources = OrderedDict() self._receivers = OrderedDict() self.wavelets = None self.observed = None sources = kwargs.pop('sources', None) receivers = kwargs.pop('receivers', None) delays = kwargs.pop('delays', None) if sources is not None and receivers is not None: for source in sources: self._sources[source.id] = source for receiver in receivers: self._receivers[receiver.id] = receiver self.wavelets = Traces(name='wavelets', transducer_ids=self.source_ids, grid=self.grid) self.observed = Traces(name='observed', transducer_ids=self.receiver_ids, grid=self.grid) self.delays = Traces(name='delays', transducer_ids=self.source_ids, shape=(len(sources), 1), grid=self.grid) if delays is not None: self.delays.data[:, 0] = delays @property def geometry(self): return self._geometry @property def source_ids(self): """ Get ids of sources in this Shot in a list. """ return list(self._sources.keys()) @property def receiver_ids(self): """ Get ids of receivers in this Shot in a list. """ return list(self._receivers.keys()) @property def sources(self): """ Get sources in this Shot as a list. """ return list(self._sources.values()) @property def receivers(self): """ Get receivers in this Shot as a list. """ return list(self._receivers.values()) @property def slow_time_index(self): """ Point along the slow time. """ if self.frame < 0: raise RuntimeError('Shot %d does not have an assigned frame.' % self.id) return self.frame * self.slow_time.num_acq + self.acq @cached_property def delayed_wavelets(self): """ Get wavelets with delays applied to them. """ if not self.delays.allocated: return self.wavelets delays = np.round(self.delays.data[:, 0] / self.time.step).astype(int) wavelets = self.wavelets.data.copy() for source_idx in range(self.num_sources): delay = delays[source_idx] if delay <= 0: continue wavelet = wavelets[source_idx, :self.time.num - delay] wavelet = np.pad(wavelet, ((delay, 0),), mode='constant', constant_values=0) wavelets[source_idx, :] = wavelet delayed_wavelets = self.wavelets.alike(name='wavelets') delayed_wavelets.data[:] = wavelets return delayed_wavelets @property def num_sources(self): """ Get number of sources in the Shot. """ return len(self.source_ids) @property def num_points_sources(self): """ Get total number of point sources in the Shot. """ return len(self.source_ids) @property def num_receivers(self): """ Get number of receivers in the Shot. """ return len(self.receiver_ids) @property def num_points_receivers(self): """ Get total number of point receivers in the Shot. """ return len(self.receiver_ids) @property def source_coordinates(self): """ Get the coordinates of all sources packed in an array format. Coordinates are defined as a 2 or 3-dimensional array with shape (n_sources, n_dimensions). """ coordinates = np.zeros((self.num_sources, self.space.dim), dtype=np.float32) source_index = 0 for source in self.sources: coordinates[source_index, :] = source.coordinates source_index += 1 return coordinates @property def receiver_coordinates(self): """ Get the coordinates of all receivers packed in an array format. Coordinates are defined as a 2 or 3-dimensional array with shape (n_receivers, n_dimensions). """ coordinates = np.zeros((self.num_receivers, self.space.dim), dtype=np.float32) receiver_index = 0 for receiver in self.receivers: coordinates[receiver_index, :] = receiver.coordinates receiver_index += 1 return coordinates
[docs] def sub_problem(self, shot, sub_problem): """ Create a subset object for a certain shot. A SubProblem contains everything that is needed to fully determine how to run a particular shot. This method takes care of generating a new Shot object that is linked to this new SubProblem. Parameters ---------- shot : Shot Shot for which the SubProblem is being generated. sub_problem : SubProblem Container for the sub-problem being generated. Returns ------- Shot Newly created Shot instance. """ shot = Shot(self.id, name=self.name, problem=sub_problem, grid=self.grid, geometry=sub_problem.geometry) for source_id in self.source_ids: location = sub_problem.geometry.get(source_id) shot._sources[location.id] = location for receiver_id in self.receiver_ids: location = sub_problem.geometry.get(receiver_id) shot._receivers[location.id] = location if self.wavelets is not None: shot.wavelets = self.wavelets if self.observed is not None: shot.observed = self.observed if self.delays is not None: shot.delays = self.delays shot.frame = self.frame shot.acq = self.acq shot.sequence_id = self.sequence_id return shot
[docs] def plot(self, **kwargs): """ Plot wavelets and observed for this shot if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- axes Axes on which the plotting is done. """ axes = [] if self.wavelets is not None and self.wavelets.allocated: axes.append(self.wavelets.plot(**kwargs)) if self.observed is not None and self.observed.allocated: axes.append(self.observed.plot(**kwargs)) return axes
[docs] def plot_wavelets(self, **kwargs): """ Plot wavelets for this shot if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- axes Axes on which the plotting is done. """ if self.wavelets is not None and self.wavelets.allocated: return self.delayed_wavelets.plot(**kwargs)
[docs] def plot_observed(self, **kwargs): """ Plot observed for this shot if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- axes Axes on which the plotting is done. """ if self.observed is not None and self.observed.allocated: return self.observed.plot(**kwargs)
[docs] def append_observed(self, *args, **kwargs): """ Append the shot to the corresponding Acquisitions file. See :class:`~mosaic.file_manipulation.h5.HDF5` for more information on the parameters of this method. Returns ------- """ kwargs['parameter'] = 'acquisitions' kwargs['version'] = kwargs.get('version', 0) if h5.file_exists(*args, **kwargs): description = { 'shots': { 'shots_%08d' % self.id: self.__get_desc__(**kwargs) } } with h5.HDF5(*args, **kwargs, mode='a') as file: file.append(description) else: self._acquisitions.dump(*args, **kwargs)
def __get_desc__(self, **kwargs): description = { 'id': self.id, 'num_sources': self.num_sources, 'num_receivers': self.num_receivers, 'source_ids': self.source_ids, 'receiver_ids': self.receiver_ids, } if self.wavelets is not None and self.wavelets.allocated: description['wavelets'] = self.wavelets.__get_desc__() if self.observed is not None and self.observed.allocated: description['observed'] = self.observed.__get_desc__() if self.delays is not None and self.delays.allocated: description['delays'] = self.delays.__get_desc__() return description def __set_desc__(self, description): self.id = description.id for source_id in description.source_ids: if self._geometry: source = self._geometry.get(source_id) else: source = None self._sources[source_id] = source for receiver_id in description.receiver_ids: if self._geometry: receiver = self._geometry.get(receiver_id) else: receiver = None self._receivers[receiver_id] = receiver self.wavelets = Traces(name='wavelets', transducer_ids=self.source_ids, grid=self.grid) if 'wavelets' in description: self.wavelets.__set_desc__(description.wavelets) self.observed = Traces(name='observed', transducer_ids=self.receiver_ids, grid=self.grid) if 'observed' in description: self.observed.__set_desc__(description.observed) self.delays = Traces(name='delays', transducer_ids=self.source_ids, shape=(len(self.source_ids), 1), grid=self.grid) if 'delays' in description: self.delays.__set_desc__(description.delays)
[docs] class Sequence(ProblemBase): """ A Sequence represents a set of shots that are fired at different times, defined with respect to a slow time axis. Parameters ---------- id : int Identifier assigned to this sequence. acq : int, optional Acquisition number at which the sequence starts, defaults to 0. name : str Optional name for the sequence. problem : Problem Problem to which the Sequence belongs. geometry : Geometry Geometry referenced by the source/receiver transducer locations of the shots. acquisitions : Acquisitions Acquisitions to which the sequence belongs. grid : Grid or any of Space or Time Grid on which the Acquisitions is defined """ def __init__(self, id, acq=0, name=None, problem=None, **kwargs): name = name or 'sequence_%05d' % id super().__init__(name=name, problem=problem, **kwargs) if id < 0: raise ValueError('The shot needs a positive ID') self.id = id self.acq = acq if problem is not None: geometry = problem.geometry else: geometry = kwargs.pop('geometry', None) if problem is not None: acquisitions = problem.acquisitions else: acquisitions = kwargs.pop('acquisitions', None) self._geometry = geometry self._acquisitions = acquisitions self._shots = OrderedDict() self._shot_selection = [] @property def shots(self): """ Get all shots in the Sequence as a list. """ return list(self._shots.values()) @property def shot_ids(self): """ Get all shot IDs in the Sequence as a list. """ return [each.id for each in self._shots.values()] @property def shot_starts(self): """ Get all starts of shots in the Sequence as a list. """ return list(self._shots.keys()) @property def num_shots(self): """ Get number of shots in the Sequence. """ return len(self.shot_starts)
[docs] def add(self, frame, item): """ Add a new shot to the Sequence. Parameters ---------- frame : int Frame number where the shot is fired. item : Shot Shot to be added to the Sequence. Returns ------- """ if frame in self._shots.keys(): raise ValueError('Shot with ID "%d" starting in frame "%d" ' 'already exists in the Sequence' % (item.id, frame)) self._shots[frame] = item item._sequence = self item.frame = frame item.acq = self.acq item.sequence_id = self.id
[docs] def get(self, frame): """ Get a shot from the Acquisitions with a known frame. Parameters ---------- frame : int Frame number of the shot in the Sequence. Returns ------- Shot Found Shot. """ if isinstance(frame, (np.int32, np.int64)): frame = int(frame) if not isinstance(frame, int) or frame < 0: raise ValueError('Shot frames have to be positive integer numbers') return self._shots[frame]
[docs] def set(self, frame, item): """ Change an existing shot in the Sequence. Parameters ---------- frame : int Frame number of the shot in the Sequence. item : Shot Shot to be modified in the Acquisitions. Returns ------- """ if frame not in self._shots.keys(): raise ValueError('Shot with ID "%d" starting in frame "%d" ' 'does not exists in the Sequence' % (item.id, frame)) self._shots[frame] = item
[docs] def select_shot_ids(self, shot_ids=None, start=None, end=None, num=None, every=1, randomly=False): """ Select a number of shots according to the rules given in the arguments to the method. For every call to this method a new group of shots will be selected according to those rules until all shots have been selected. At that point, the selection will start again. Parameters ---------- shot_ids : list, optional List of shot IDs to select from. start : int, optional Start of the slice, defaults to the first time step. end : int, optional End of the slice, defaults to the last time step. num : int, optional Number of shots to select every time the method is called. every : int, optional How many shots to skip in the selection, defaults to 1, which means taking all shots subsequently. randomly : bool, optional Whether to select the shots at random at in order, defaults to False. Returns ------- list List with selected shots. """ shot_starts = self.shot_starts if shot_ids is None else shot_ids next_slice, selection = _select_slice(self._shot_selection, shot_starts, start=start, end=end, num=num, every=every, randomly=randomly) self._shot_selection = selection return next_slice
def __get_desc__(self, **kwargs): description = { 'id': self.id, 'acq': self.acq, 'num_shots': self.num_shots, 'shots': [], } for frame, shot in self._shots.items(): description['shots'].append({ 'frame': frame, 'id': shot.id }) return description def __set_desc__(self, description): self.id = description.id self.acq = description.acq for shot_desc in description.shots: if shot_desc.frame not in self._shots: shot = self._acquisitions.get(shot_desc.id) self.add(shot_desc.frame, shot)
[docs] class Acquisitions(ProblemBase): """ Acquisitions establish a series of shots that will be or have been fired to generate data. A shot is an even in which one or more transducer locations act as sources with a given wavelet and one or more transducer locations act as receivers and record some observed data. Shots are identified through a numerical ID, which is >= 0. Shots can be added through ``Acquisitions.add(shot)`` and can be accessed through ``Acquisitions.get(shot_id)``. The Acquisitions also provides utilities for loading and dumping these shots and their data. Parameters ---------- name : str Alternative name to give to the medium. problem : Problem Problem to which the Acquisitions belongs. geometry : Geometry Geometry referenced by the source/receiver transducer locations of the shot. grid : Grid or any of Space or Time Grid on which the Acquisitions is defined """ def __init__(self, name='acquisitions', problem=None, **kwargs): super().__init__(name=name, problem=problem, **kwargs) if problem is not None: geometry = problem.geometry else: geometry = kwargs.pop('geometry', None) self._geometry = geometry self._shots = OrderedDict() self._sequences = OrderedDict() self._shot_selection = [] self._sequence_selection = [] @property def shots(self): """ Get all shots in the Acquisitions as a list. """ return list(self._shots.values()) @property def sequences(self): """ Get all sequences in the Acquisitions as a list. """ return list(self._sequences.values()) @property def shot_ids(self): """ Get all IDs of shots in the Acquisitions as a list. """ return list(self._shots.keys()) @property def sequence_ids(self): """ Get all IDs of sequences in the Acquisitions as a list. """ return list(self._sequences.keys()) @property def num_shots(self): """ Get number of shots in the Acquisitions. """ return len(self.shot_ids) @property def num_sequences(self): """ Get number of sequences in the Acquisitions. """ return len(self.sequence_ids) @property def num_sources_per_shot(self): """ Get maximum number of sources in any shot. """ num_transducers = max(*[each.num_sources for each in self._shots.values()]) return num_transducers @property def num_receivers_per_shot(self): """ Get maximum number of receivers in any shot. """ num_transducers = max(*[each.num_receivers for each in self._shots.values()]) return num_transducers @property def remaining_shots(self): """ Get dict of all shots that have no observed allocated. """ shots = OrderedDict() for shot_id, shot in self._shots.items(): if not shot.observed.allocated: shots[shot_id] = shot return shots @property def remaining_shot_ids(self): """ Get list of all shot IDs that have no observed allocated. """ shot_ids = [] for shot_id, shot in self._shots.items(): if not shot.observed.allocated: shot_ids.append(shot_id) return shot_ids
[docs] def add(self, item): """ Add a new shot to the Acquisitions. Parameters ---------- item : Shot Shot to be added to the Acquisitions. Returns ------- """ if item.id in self._shots.keys(): raise ValueError('Shot with ID "%d" already exists in the Acquisitions' % item.id) self._shots[item.id] = item item._acquisitions = self
[docs] def add_sequence(self, item): """ Add a new sequence to the Acquisitions. Parameters ---------- item : Sequence Sequence to be added to the Acquisitions. Returns ------- """ if item.id in self._sequences.keys(): raise ValueError('Sequence with ID "%d" already exists in the Acquisitions' % item.id) self._sequences[item.id] = item item._acquisitions = self
[docs] def get(self, id): """ Get a shot from the Acquisitions with a known id. Parameters ---------- id : int Identifier of the shot. Returns ------- Shot Found Shot. """ if isinstance(id, (np.int32, np.int64)): id = int(id) if not isinstance(id, int) or id < 0: raise ValueError('Shot IDs have to be positive integer numbers') return self._shots[id]
[docs] def get_sequence(self, id): """ Get a sequence from the Acquisitions with a known id. Parameters ---------- id : int Identifier of the shot. Returns ------- Sequence Found Sequence. """ if isinstance(id, (np.int32, np.int64)): id = int(id) if not isinstance(id, int) or id < 0: raise ValueError('Sequence IDs have to be positive integer numbers') return self._sequences[id]
[docs] def set(self, item): """ Change an existing shot in the Acquisitions. Parameters ---------- item : Shot Shot to be modified in the Acquisitions. Returns ------- """ if item.id not in self._shots.keys(): raise ValueError('Shot with ID "%d" does not exist in the Acquisitions' % item.id) self._shots[item.id] = item
[docs] def set_sequence(self, item): """ Change an existing sequence in the Acquisitions. Parameters ---------- item : Sequence Sequence to be modified in the Acquisitions. Returns ------- """ if item.id not in self._sequences.keys(): raise ValueError('Sequence with ID "%d" does not exist in the Acquisitions' % item.id) self._sequences[item.id] = item
[docs] def reset_selection(self): """ Reset the shot and sequence selection. Returns ------- """ self._shot_selection = [] self._sequence_selection = []
[docs] def select_shot_ids(self, shot_ids=None, start=None, end=None, num=None, every=1, randomly=False): """ Select a number of shots according to the rules given in the arguments to the method. For every call to this method a new group of shots will be selected according to those rules until all shots have been selected. At that point, the selection will start again. Parameters ---------- shot_ids : list, optional List of shot IDs to select from. start : int, optional Start of the slice, defaults to the first id. end : int, optional End of the slice, defaults to the last id. num : int, optional Number of shots to select every time the method is called. every : int, optional How many shots to skip in the selection, defaults to 1, which means taking all shots subsequently. randomly : bool, optional Whether to select the shots at random at in order, defaults to False. Returns ------- list List with selected shots. """ shot_ids = self.shot_ids if shot_ids is None else shot_ids next_slice, selection = _select_slice(self._shot_selection, shot_ids, start=start, end=end, num=num, every=every, randomly=randomly) self._shot_selection = selection return next_slice
[docs] def select_sequence_ids(self, start=None, end=None, num=None, every=1, randomly=False): """ Select a number of sequences according to the rules given in the arguments to the method. For every call to this method a new group of sequences will be selected according to those rules until all shots have been selected. At that point, the selection will start again. Parameters ---------- start : int, optional Start of the slice, defaults to the first id. end : int, optional End of the slice, defaults to the last id. num : int, optional Number of shots to select every time the method is called. every : int, optional How many shots to skip in the selection, defaults to 1, which means taking all shots subsequently. randomly : bool, optional Whether to select the shots at random at in order, defaults to False. Returns ------- list List with selected sequences. """ next_slice, selection = _select_slice(self._sequence_selection, self.sequence_ids, start=start, end=end, num=num, every=every, randomly=randomly) self._sequence_selection = selection return next_slice
[docs] def default(self): """ Fill the container with the default configuration. In this case, that means that every location in the Geometry acts as a source once while every location acts as a receiver. This generates as many shots as there are locations available in the Geometry. Each Shot only has one source and as many receivers as locations are in the Geometry. Returns ------- """ for source in self._geometry.locations: receivers = self._geometry.locations self.add(Shot(source.id, sources=[source], receivers=receivers, geometry=self._geometry, problem=self.problem))
[docs] def plot(self, **kwargs): """ Plot wavelets and observed for for all shots if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- """ plot = kwargs.pop('plot', True) self.plot_wavelets(**kwargs) self.plot_observed(**kwargs) if plot is True: plotting.show()
def _plot(self, update, **kwargs): try: import matplotlib.pyplot as plt from matplotlib.widgets import Slider except ModuleNotFoundError: return None axis = kwargs.pop('axis', None) if axis is not None: figure = axis.get_figure() else: figure, axis = plt.subplots(1, 1) plt.subplots_adjust(bottom=0.25) axis.margins(x=0) ax_shot = plt.axes([0.15, 0.1, 0.7, 0.03]) if self.num_shots > 1: shot_step = self.shot_ids[1]-self.shot_ids[0] else: shot_step = 1 shot_range = kwargs.pop('shot_range', (self.shot_ids[0], self.shot_ids[-1])) shot_step = kwargs.pop('shot_step', shot_step) slider = Slider(ax_shot, 'shot ID', shot_range[0], shot_range[1], valinit=self.shot_ids[0], valstep=shot_step) update = functools.partial(update, figure, axis) update(self.shot_ids[0]) slider.on_changed(update) axis.slider = slider return axis
[docs] def plot_wavelets(self, **kwargs): """ Plot wavelets for for all shots if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- """ if not self.num_shots or not self.get(self.shot_ids[0]).wavelets.allocated: return None kwargs['plot'] = False def update(figure, axis, shot_id): kwargs.pop('axis', None) kwargs.pop('shot_range', None) kwargs.pop('shot_step', None) axis.clear() self.get(int(shot_id)).plot_wavelets(axis=axis, **kwargs) axis.set_title(axis.get_title() + ' - shot %d' % shot_id) figure.canvas.draw_idle() return self._plot(update, **kwargs)
[docs] def plot_observed(self, **kwargs): """ Plot observed for for all shots if they are allocated. Parameters ---------- kwargs Arguments for plotting. Returns ------- """ if not self.num_shots or not self.get(self.shot_ids[0]).observed.allocated: return None kwargs['plot'] = False def update(figure, axis, shot_id): kwargs.pop('axis', None) kwargs.pop('shot_range', None) kwargs.pop('shot_step', None) axis.clear() self.get(int(shot_id)).plot_observed(axis=axis, **kwargs) axis.set_title(axis.get_title() + ' - shot %d' % shot_id) figure.canvas.draw_idle() return self._plot(update, **kwargs)
[docs] def sub_problem(self, shot, sub_problem): """ Create a subset object for a certain shot. A SubProblem contains everything that is needed to fully determine how to run a particular shot. This method takes care of selecting the portions of the Acquisitions that are needed for a given shot. Parameters ---------- shot : Shot Shot for which the SubProblem is being generated. sub_problem : SubProblem Container for the sub-problem being generated. Returns ------- Acquisitions Newly created Acquisitions instance. """ sub_acquisitions = Acquisitions(name=self.name, geometry=sub_problem.geometry, problem=sub_problem, grid=self.grid) sub_acquisitions.add(shot) return sub_acquisitions
def __get_desc__(self, **kwargs): description = { 'num_shots': self.num_shots, 'shots': [], 'sequences': [], } for shot in self.shots: description['shots'].append(shot.__get_desc__()) for sequence in self.sequences: description['sequences'].append(sequence.__get_desc__()) return description def __set_desc__(self, description): for shot_desc in description.shots: if shot_desc.id not in self._shots: shot = Shot(shot_desc.id, geometry=self._geometry, problem=self.problem, grid=self.grid) self.add(shot) shot = self.get(shot_desc.id) shot.__set_desc__(shot_desc) if 'sequences' in description: for seq_desc in description.sequences: if seq_desc.id not in self._sequences: sequence = Sequence(seq_desc.id, geometry=self._geometry, problem=self.problem, grid=self.grid) self.add_sequence(sequence) sequence = self.get_sequence(seq_desc.id) sequence.__set_desc__(seq_desc)