Source code for spynnaker.pyNN.external_devices_models.push_bot.push_bot_parameters.push_bot_retina_viewer
# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import math
import socket
from threading import Thread
import numpy
# Value of brightest pixel to show
_DISPLAY_MAX = 33.0
# How regularity to display frames
_FRAME_TIME_MS = 10
# Time constant of pixel decay
_DECAY_TIME_CONSTANT_MS = 100
_BUFFER_SIZE = 512
[docs]class PushBotRetinaViewer(Thread):
""" A viewer for the pushbot's retina. This is a thread that can be \
launched in parallel with the control code.
Based on matplotlib
"""
def __init__(
self, resolution, port=0, display_max=_DISPLAY_MAX,
frame_time_ms=_FRAME_TIME_MS,
decay_time_constant_ms=_DECAY_TIME_CONSTANT_MS):
# pylint: disable=too-many-arguments
try:
from matplotlib import pyplot # NOQA
from matplotlib import animation # NOQA
self.__pyplot = pyplot
self.__animation = animation
except ImportError:
raise Exception("matplotlib must be installed to use this viewer")
super(PushBotRetinaViewer, self).__init__(name="PushBotRetinaViewer")
self.__display_max = display_max
self.__frame_time_ms = frame_time_ms
self.__image = None
self.__ani = None
# Open socket to receive UDP
self._init_socket(port)
# Determine mask for coordinates
self.__coordinate_mask = \
(1 << (2 * resolution.bits_per_coordinate)) - 1
# Set up the image
self.__image_data = numpy.zeros(resolution.pixels * resolution.pixels)
self.__image_data_view = self.__image_data.view()
self.__image_data_view.shape = (resolution.pixels, resolution.pixels)
# Calculate decay proportion each frame
self.__decay_proportion = math.exp(
-float(self.__frame_time_ms) / float(decay_time_constant_ms))
def _init_socket(self, port):
""" Open socket to receive UDP.
"""
self.__spike_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__spike_socket.bind(("0.0.0.0", port))
self.__spike_socket.setblocking(False)
self.__local_host, self.__local_port = \
self.__spike_socket.getsockname()
@property
def local_host(self):
return self.__local_host
@property
def local_port(self):
return self.__local_port
def __recv_data(self, size=_BUFFER_SIZE):
return self.__spike_socket.recv(size)
def _close(self):
self.__spike_socket.close()
def _parse_raw_data(self, raw_data):
# Slice off EIEIO header and timestamp, and convert to numpy
# array of uint32
payload = numpy.fromstring(raw_data[6:], dtype="uint32")
# Mask out x, y coordinates
payload &= self.__coordinate_mask
# Increment these pixels
self.__image_data[payload] += 1.0
def _updatefig(self):
# Read all UDP messages received during last frame
while True:
try:
self._parse_raw_data(self.__recv_data())
except socket.error:
# Stop reading
break
# Decay image data
self.__image_data *= self.__decay_proportion
# Set image data
self.__image.set_array(self.__image_data_view)
return [self.__image]
[docs] def run(self):
""" How the viewer works when the thread is running.
"""
# Create image plot of retina output
fig = self.__pyplot.figure()
self.__image = self.__pyplot.imshow(
self.__image_data_view, cmap="viridis", vmin=0.0,
vmax=self.__display_max)
# Play animation
self.__ani = self.__animation.FuncAnimation(
fig, (lambda _frame: self._updatefig()),
interval=self.__frame_time_ms, blit=True)
self.__pyplot.show()