# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
import logging
import numpy
from six import string_types
from six.moves import xrange
import neo
import quantities
from spinn_utilities import logger_utils
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.log import FormatAdapter
from spinn_front_end_common.utilities.globals_variables import get_simulator
from spynnaker.pyNN.models.common import (
AbstractNeuronRecordable, AbstractSpikeRecordable)
from spynnaker.pyNN.models.recording_common import RecordingCommon
from spynnaker.pyNN.utilities.constants import (
SPIKES, MEMBRANE_POTENTIAL, GSYN_EXCIT, GSYN_INHIB)
from spynnaker.pyNN.exceptions import InvalidParameterType
from .data_cache import DataCache
from spynnaker8.utilities.version_util import pynn8_syntax
logger = FormatAdapter(logging.getLogger(__name__))
_DEFAULT_UNITS = {
SPIKES: "spikes",
MEMBRANE_POTENTIAL: "mV",
GSYN_EXCIT: "uS",
GSYN_INHIB: "uS"}
[docs]class Recorder(RecordingCommon):
# pylint: disable=protected-access
def __init__(self, population):
super(Recorder, self).__init__(population)
self._recording_start_time = get_simulator().t
self._data_cache = {}
def _extract_neo_block(self, variables, view_indexes, clear, annotations):
""" Extracts block from the vertices and puts them into a Neo block
:param variables: the variables to extract
:param view_indexes: the indexes to be included in the view
:param clear: if the variables should be cleared after reading
:param annotations: annotations to put on the Neo block
:return: The Neo block
"""
block = neo.Block()
for previous in range(0, get_simulator().segment_counter):
self._append_previous_segment(
block, previous, variables, view_indexes)
# add to the segments the new block
self._append_current_segment(block, variables, view_indexes, clear)
# add fluff to the neo block
block.name = self._population.label
block.description = self._population.describe()
block.rec_datetime = block.segments[0].rec_datetime
block.annotate(**self._metadata())
if annotations:
block.annotate(**annotations)
return block
def _get_units(self, variable):
""" Get units with some safety code if the population has trouble
:param variable: name of the variable
:type variable: str
:return: type of the data
:rtype: str
"""
try:
return self._population.find_units(variable)
except Exception:
logger.warning("Population: {} Does not support units for {}",
self._population.label, variable)
if variable in _DEFAULT_UNITS:
return _DEFAULT_UNITS[variable]
raise
[docs] def cache_data(self):
""" Store data for later extraction
"""
variables = self._get_all_recording_variables()
if variables:
segment_number = get_simulator().segment_counter
logger.info("Caching data for segment {:d}", segment_number)
data_cache = DataCache(
label=self._population.label,
description=self._population.describe(),
segment_number=segment_number,
recording_start_time=self._recording_start_time,
t=get_simulator().t)
for variable in variables:
if variable == SPIKES:
data = self._get_spikes()
sampling_interval = self._population._vertex. \
get_spikes_sampling_interval()
indexes = None
else:
results = self._get_recorded_matrix(variable)
(data, indexes, sampling_interval) = results
data_cache.save_data(
variable=variable, data=data, indexes=indexes,
n_neurons=self._population.size,
units=self._get_units(variable),
sampling_interval=sampling_interval)
self._data_cache[segment_number] = data_cache
def _filter_recorded(self, filter_ids):
record_ids = list()
for neuron_id in range(0, len(filter_ids)):
if filter_ids[neuron_id]:
# add population first ID to ensure all atoms have a unique
# identifier (PyNN enforcement)
record_ids.append(neuron_id + self._population.first_id)
return record_ids
def _clean_variables(self, variables):
""" Sorts out variables for processing usage
:param variables: list of variables names, or 'all', or single.
:return: ordered set of variables strings.
"""
# if variable is a base string, plonk into a array for ease of
# conversion
if isinstance(variables, string_types):
variables = [variables]
# if all are needed to be extracted, extract each and plonk into the
# neo block segment. ensures whatever was in variables stays in
# variables, regardless of all
if 'all' in variables:
variables = OrderedSet(variables)
variables.remove('all')
variables.update(self._get_all_recording_variables())
return variables
def _append_current_segment(self, block, variables, view_indexes, clear):
# build segment for the current data to be gathered in
segment = neo.Segment(
name="segment{}".format(get_simulator().segment_counter),
description=self._population.describe(),
rec_datetime=datetime.now())
# sort out variables for using
variables = self._clean_variables(variables)
for variable in variables:
if variable == SPIKES:
sampling_interval = self._population._vertex. \
get_spikes_sampling_interval()
self.read_in_spikes(
segment=segment,
spikes=self._get_spikes(),
t=get_simulator().get_current_time(),
n_neurons=self._population.size,
recording_start_time=self._recording_start_time,
sampling_interval=sampling_interval,
indexes=view_indexes,
label=self._population.label)
else:
(data, data_indexes, sampling_interval) = \
self._get_recorded_matrix(variable)
self.read_in_signal(
segment=segment,
block=block,
signal_array=data,
data_indexes=data_indexes,
view_indexes=view_indexes,
variable=variable,
recording_start_time=self._recording_start_time,
sampling_interval=sampling_interval,
units=self._get_units(variable),
label=self._population.label)
block.segments.append(segment)
if clear:
self._clear_recording(variables)
def _append_previous_segment(
self, block, segment_number, variables, view_indexes):
if segment_number not in self._data_cache:
logger.warning("No Data available for Segment {}", segment_number)
segment = neo.Segment(
name="segment{}".format(segment_number),
description="Empty",
rec_datetime=datetime.now())
return segment
data_cache = self._data_cache[segment_number]
# sort out variables
variables = self._clean_variables(variables)
# build segment for the previous data to be gathered in
segment = neo.Segment(
name="segment{}".format(segment_number),
description=data_cache.description,
rec_datetime=data_cache.rec_datetime)
for variable in variables:
if variable not in data_cache.variables:
logger.warning("No Data available for Segment {} variable {}",
segment_number, variable)
continue
variable_cache = data_cache.get_data(variable)
if variable == SPIKES:
self.read_in_spikes(
segment=segment,
spikes=variable_cache.data,
t=data_cache.t,
n_neurons=variable_cache.n_neurons,
recording_start_time=data_cache.recording_start_time,
sampling_interval=variable_cache.sampling_interval,
indexes=view_indexes,
label=data_cache.label)
else:
self.read_in_signal(
segment=segment,
block=block,
signal_array=variable_cache.data,
data_indexes=variable_cache.indexes,
view_indexes=view_indexes,
variable=variable,
recording_start_time=data_cache.recording_start_time,
sampling_interval=variable_cache.sampling_interval,
units=variable_cache.units,
label=data_cache.label)
block.segments.append(segment)
def _get_all_possible_recordable_variables(self):
variables = OrderedSet()
if isinstance(self._population._vertex, AbstractSpikeRecordable):
variables.add(SPIKES)
if isinstance(self._population._vertex, AbstractNeuronRecordable):
variables.update(
self._population._vertex.get_recordable_variables())
return variables
def _get_all_recording_variables(self):
possibles = self._get_all_possible_recordable_variables()
variables = OrderedSet()
for possible in possibles:
if possible == SPIKES:
if isinstance(self._population._vertex,
AbstractSpikeRecordable) \
and self._population._vertex.is_recording_spikes():
variables.add(possible)
elif isinstance(self._population._vertex,
AbstractNeuronRecordable) and \
self._population._vertex.is_recording(possible):
variables.add(possible)
return variables
def _metadata(self):
metadata = {
'size': self._population.size,
'first_index': 0,
'last_index': self._population.size,
'first_id': int(self._population.first_id),
'last_id': int(self._population.last_id),
'label': self._population.label,
'simulator': get_simulator().name,
}
metadata.update(self._population._annotations)
metadata['dt'] = get_simulator().dt
metadata['mpi_processes'] = get_simulator().num_processes
return metadata
def _clear_recording(self, variables):
for variable in variables:
if variable == SPIKES:
self._population._vertex.clear_spike_recording(
get_simulator().buffer_manager,
get_simulator().placements,
get_simulator().graph_mapper)
elif variable == MEMBRANE_POTENTIAL:
self._population._vertex.clear_v_recording(
get_simulator().buffer_manager,
get_simulator().placements,
get_simulator().graph_mapper)
elif variable == GSYN_EXCIT:
self._population._vertex.clear_gsyn_inhibitory_recording(
get_simulator().buffer_manager,
get_simulator().placements,
get_simulator().graph_mapper)
elif variable == GSYN_INHIB:
self._population._vertex.clear_gsyn_excitatory_recording(
get_simulator().buffer_manager,
get_simulator().placements,
get_simulator().graph_mapper)
else:
raise InvalidParameterType(
"The variable {} is not a recordable value".format(
variable))
[docs] def read_in_spikes(
self, segment, spikes, t, n_neurons, recording_start_time,
sampling_interval, indexes, label):
""" Converts the data into SpikeTrains and saves them to the segment.
:param segment: Segment to add spikes to
:type segment: neo.Segment
:param spikes: Spike data in raw sPyNNaker format
:type spikes: nparray
:param t: last simulation time
:type t: int
:param n_neurons: total number of neurons including ones not recording
:type n_neurons: int
:param recording_start_time: time recording started
:type recording_start_time: int
:param sampling_interval: how often a neuron is recorded
:param label: recording elements label
:type label: str
"""
# pylint: disable=too-many-arguments
# Safety check in case spikes are an empty list
if len(spikes) == 0:
spikes = numpy.empty(shape=(0, 2))
t_stop = t * quantities.ms
if indexes is None:
indexes = xrange(n_neurons)
for index in indexes:
spiketrain = neo.SpikeTrain(
times=spikes[spikes[:, 0] == index][:, 1],
t_start=recording_start_time,
t_stop=t_stop,
units='ms',
sampling_interval=sampling_interval,
source_population=label,
source_id=self._population.index_to_id(index),
source_index=index)
# get times per atom
segment.spiketrains.append(spiketrain)
[docs] def read_in_signal(
self, segment, block, signal_array, data_indexes, view_indexes,
variable, recording_start_time, sampling_interval, units, label):
""" Reads in a data item that's not spikes (likely v, gsyn e, gsyn i)\
and saves this data to the segment.
:param segment: Segment to add data to
:type segment: neo.Segment
:param block: neo block
:type block: neo.Block
:param signal_array: the raw signal data
:type signal_array: nparray
:param data_indexes: The indexes for the recorded data
:type data_indexes: list(int)
:param view_indexes: The indexes for which data should be returned.\
If None all data (view_index = data_indexes)
:type view_indexes: list(int)
:param variable: the variable name
:param recording_start_time: when recording started
:param sampling_interval: how often a neuron is recorded
:param units: the units of the recorded value
:param label: human readable label
"""
# pylint: disable=too-many-arguments, no-member
t_start = recording_start_time * quantities.ms
sampling_period = sampling_interval * quantities.ms
if view_indexes is None:
if not numpy.array_equal(data_indexes, self._all_ids):
msg = "Warning getting data on a whole population when " \
"selective recording is active will result in only " \
"the requested neurons being returned in numerical " \
"order and without repeats."
logger_utils.warn_once(logger, msg)
indexes = numpy.array(data_indexes)
elif view_indexes == data_indexes:
indexes = numpy.array(data_indexes)
else:
# keep just the view indexes in the data
indexes = [i for i in view_indexes if i in data_indexes]
# keep just data columns in the view
map_indexes = [data_indexes.index(i) for i in indexes]
signal_array = signal_array[:, map_indexes]
ids = list(map(self._population.index_to_id, indexes))
if pynn8_syntax:
data_array = neo.AnalogSignalArray(
signal_array,
units=units,
t_start=t_start,
sampling_period=sampling_period,
name=variable,
source_population=label,
channel_index=indexes,
source_ids=ids)
data_array.shape = (data_array.shape[0], data_array.shape[1])
segment.analogsignalarrays.append(data_array)
else:
data_array = neo.AnalogSignal(
signal_array,
units=units,
t_start=t_start,
sampling_period=sampling_period,
name=variable,
source_population=label,
source_ids=ids)
channel_index = _get_channel_index(indexes, block)
data_array.channel_index = channel_index
data_array.shape = (data_array.shape[0], data_array.shape[1])
segment.analogsignals.append(data_array)
channel_index.analogsignals.append(data_array)
def _get_channel_index(ids, block):
# Note this code is only called if not pynn8_syntax
for channel_index in block.channel_indexes:
if numpy.array_equal(channel_index.index, ids):
return channel_index
count = len(block.channel_indexes)
channel_index = neo.ChannelIndex(
name="Index {}".format(count), index=ids)
block.channel_indexes.append(channel_index)
return channel_index
def _convert_extracted_data_into_neo_expected_format(
signal_array, indexes):
""" Converts data between sPyNNaker format and Neo format
:param signal_array: Draw data in sPyNNaker format
:type signal_array: nparray
:rtype nparray
"""
processed_data = [
signal_array[:, 2][signal_array[:, 0] == index]
for index in indexes]
processed_data = numpy.vstack(processed_data).T
return processed_data
def _add_pynn9_signal_chunk(
segment, processed_data, units, t_start, sampling_period, variable,
label, ids, block):
# pylint: disable=too-many-arguments
source_ids = numpy.fromiter(ids, dtype=int)
data_array = neo.AnalogSignal(
processed_data,
units=units,
t_start=t_start,
sampling_period=sampling_period,
name=variable,
source_population=label,
source_ids=source_ids)
channel_index = _get_channel_index(ids, block)
data_array.channel_index = channel_index
data_array.shape = (data_array.shape[0], data_array.shape[1])
segment.analogsignals.append(data_array)
channel_index.analogsignals.append(data_array)