Skip to content

Commit

Permalink
Merge pull request #1535 from SpiNNakerManchester/import_direct
Browse files Browse the repository at this point in the history
don't pass a module as a parameter
  • Loading branch information
rowleya authored Feb 3, 2025
2 parents 8862977 + 7eff766 commit f121a49
Showing 1 changed file with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
import logging
from threading import Thread, RLock
from time import sleep
from typing import Any, List

from matplotlib import pyplot # type: ignore[import]
import numpy

from spinn_utilities.log import FormatAdapter

import spynnaker.pyNN.external_devices as external_devices
from spynnaker.pyNN.external_devices_models.push_bot.parameters import (
PushBotRetinaResolution)
from spynnaker.pyNN.connections import SpynnakerLiveSpikesConnection

_logger = FormatAdapter(logging.getLogger(__name__))
MAX_VALUE = 33.0
ADD_VALUE = 1.0
Expand All @@ -35,9 +41,12 @@ class PushBotRetinaViewer():
"__image_data", "__image_lock",
"__without_polarity_mask", "__height",
"__fig", "__plot",
"__running", "__sim", "__conn")
"__running", "__conn")

def __init__(self, retina_resolution, label, sim):
def __init__(self, retina_resolution: PushBotRetinaResolution,
label: str, sim: None = None):
if sim is not None:
_logger.warning("PushBotRetinaViewer: sim=None is deprecated")
pyplot.ion()
self.__image_data = numpy.zeros(
(retina_resolution.value.pixels, retina_resolution.value.pixels),
Expand All @@ -56,13 +65,12 @@ def __init__(self, retina_resolution, label, sim):
self.__running = True
self.__image_lock = RLock()

self.__sim = sim
self.__conn = sim.external_devices.SpynnakerLiveSpikesConnection(
self.__conn = SpynnakerLiveSpikesConnection(
receive_labels=[label], local_port=None)
self.__conn.add_receive_callback(label, self.__recv)

@property
def port(self):
def port(self) -> int:
"""
The port the connection is listening on.
Expand All @@ -71,34 +79,40 @@ def port(self):
return self.__conn.local_port

# pylint: disable=unused-argument
def __recv(self, label, time, spikes):
def __recv(self, label: str, time: int, spikes: List[int]) -> None:
np_spikes = numpy.array(spikes) & self.__without_polarity_mask
x_vals, y_vals = numpy.divmod(np_spikes, self.__height)
self.__image_lock.acquire()
self.__image_data[x_vals, y_vals] += 1.0
self.__image_lock.release()

def __run_sim_forever(self):
def __run_sim_forever(self) -> None:
# UGLY but needed to avoid circular import
# pylint: disable=import-outside-toplevel
from pyNN.spiNNaker import end
try:
self.__sim.external_devices.run_forever()
external_devices.run_forever()
self.__running = False
self.__sim.end()
end()
except KeyboardInterrupt:
pass
except Exception: # pylint: disable=broad-except
_logger.exception("unexpected exception in simulation thread")

def __run_sim(self, run_time):
def __run_sim(self, run_time: float) -> None:
# UGLY but needed to avoid circular import
# pylint: disable=import-outside-toplevel
from pyNN.spiNNaker import end, run
try:
self.__sim.run(run_time)
run(run_time)
self.__running = False
self.__sim.end()
end()
except KeyboardInterrupt:
pass
except Exception: # pylint: disable=broad-except
_logger.exception("unexpected exception in simulation thread")

def __run(self, run_thread):
def __run(self, run_thread: Any) -> None:
try:
while self.__running and self.__fig.get_visible():
self.__image_lock.acquire()
Expand All @@ -113,10 +127,10 @@ def __run(self, run_thread):
except Exception: # pylint: disable=broad-except
_logger.exception("unexpected exception in drawing thread")

def __on_close(self, event):
def __on_close(self, event: Any) -> None:
self.__running = False

def run_until_closed(self):
def run_until_closed(self) -> None:
"""
Run the viewer and simulation until the viewer is closed.
"""
Expand All @@ -126,10 +140,10 @@ def run_until_closed(self):
self.__fig.canvas.mpl_connect('close_event', self.__on_close)
self.__run(run_thread)
finally:
self.__sim.external_devices.request_stop()
external_devices.request_stop()
run_thread.join()

def run(self, run_time):
def run(self, run_time: float) -> None:
"""
Run the viewer and simulation for a fixed time.
"""
Expand Down

0 comments on commit f121a49

Please sign in to comment.