Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions modalapi/modhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ def __init__(self, audiocard: Audiocard, homedir, data_dir="/home/pistomp/data")
self.ws_bridge.start()
logging.info("WebSocket bridge started")

# Tuner state (may be disabled on slow hardware)
self.tuner_supported: bool = True
# Tuner state
self._tuner_engine: TunerEngine | None = None
self._tuner_panel: TunerPanel | None = None
self._tuner_source_factory: TunerSourceFactory | None = None
Expand Down Expand Up @@ -278,13 +277,10 @@ def poll_lcd_updates(self):

@property
def lcd_poll_divisor(self) -> int:
# Tick the LCD on every 10 ms main-loop pass (~100 fps) while the
# tuner panel is mounted. Strobe's worst-case redraw at STRIPE_W=4
# is ~4.3 ms of SPI, well inside the 10 ms budget; typical ticks
# are sub-millisecond. Otherwise fall back to the SPI-clock-derived
# divisor computed by the LCD itself.
# 50 fps (every other 10 ms tick) while the tuner is active — fast enough
# for smooth strobe animation and leaves headroom for SPI transfers.
if self._tuner_panel is not None:
return 1
return 2
return self._lcd.poll_divisor if self._lcd is not None else 8

def universal_encoder_select(self, direction):
Expand Down Expand Up @@ -1050,8 +1046,6 @@ def _tuner_factory(self, port: str) -> AudioSource:
return factory(port, name=f"pistomp-tuner-{port.split('_')[-1]}")

def toggle_tuner_enable(self, *argv) -> None:
if not self.tuner_supported:
return
if self._tuner_engine is None:
muted = bool(self.settings.get_setting(Token.TUNER_MUTE))
input_port = int(self.settings.get_setting(Token.TUNER_INPUT) or 1)
Expand Down
1 change: 0 additions & 1 deletion pistomp/handlerfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def create(self, cfg, audiocard, cwd):
handler = Mod.Mod(audiocard, cwd)
elif (version >= 2.0) and (version < 3.0):
handler = Modhandler.Modhandler(audiocard, cwd)
handler.tuner_supported = False
elif (version >= 3.0) and (version < 4.0):
handler = Modhandler.Modhandler(audiocard, cwd)
else:
Expand Down
7 changes: 3 additions & 4 deletions pistomp/lcd320x240.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,9 @@ def get_footswitch_pitch(self):
# System Menu
#
def draw_system_menu(self, event, widget):
items = [("System info", self.draw_system_info_dialog, None)]
if self.handler and self.handler.tuner_supported:
items.append(("Tuner", self._toggle_tuner_from_menu, None))
items += [("System shutdown", self.handler.system_menu_shutdown, None),
items = [("System info", self.draw_system_info_dialog, None),
("Tuner", self._toggle_tuner_from_menu, None),
("System shutdown", self.handler.system_menu_shutdown, None),
("System reboot", self.handler.system_menu_reboot, None),
("Restart sound engine", self.handler.system_menu_restart_sound, None),
("Bank Select >", self.draw_bank_menu, None),
Expand Down
16 changes: 11 additions & 5 deletions pistomp/tuner/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pistomp.tuner.ringbuffer import RingBuffer
from pistomp.tuner.source import AudioSource
from pistomp.tuner.yin import detect_pitch
from pistomp.tuner.yin import YinDetector

_NOTE_NAMES = ["C", "C\u266f", "D", "D\u266f", "E", "F", "F\u266f", "G", "G\u266f", "A", "A\u266f", "B"]
_A4_HZ = 440.0
Expand Down Expand Up @@ -81,10 +81,16 @@ def __init__(
self._freq_history: deque[float] = deque(maxlen=self.MEDIAN_LEN)
self._prev_rms: float = 0.0
self._onset_holdoff: int = 0
self._detector: YinDetector | None = None

def start(self) -> None:
self._running = True
self._source.start(on_samples=self._ring.write)
lo, hi = self._freq_bounds
self._detector = YinDetector(
self.FRAME_SIZE, self._source.sample_rate,
freq_min=lo, freq_max=hi, window=self.YIN_WINDOW,
)
self._worker = threading.Thread(target=self._dsp_loop, daemon=True, name="tuner-dsp")
self._worker.start()

Expand All @@ -94,6 +100,7 @@ def stop(self) -> None:
if self._worker is not None:
self._worker.join(timeout=2.0)
self._worker = None
self._detector = None

def _dsp_loop(self) -> None:
interval = 1.0 / self.DSP_RATE_HZ
Expand All @@ -109,7 +116,7 @@ def _process(self) -> None:
if not self._ring.read_latest(self.FRAME_SIZE, self._frame):
return

rms = float(np.sqrt(np.mean(self._frame.astype(np.float64) ** 2)))
rms = float(np.sqrt(np.mean(self._frame ** 2)))

if rms < self.SILENCE_RMS:
self._freq_history.clear()
Expand All @@ -132,9 +139,8 @@ def _process(self) -> None:
self._onset_holdoff -= 1
return

sr = self._source.sample_rate
lo, hi = self._freq_bounds
estimate = detect_pitch(self._frame, sr, freq_min=lo, freq_max=hi, window=self.YIN_WINDOW)
assert self._detector is not None
estimate = self._detector.detect(self._frame)
if estimate is None:
return

Expand Down
125 changes: 60 additions & 65 deletions pistomp/tuner/panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def _zone(cents: float) -> Zone:
return "red"


def _zone_color(cents: float) -> Color:
return _ZONE_COLORS[_zone(cents)]


# ── TunerHeaderWidget ────────────────────────────────────────────────────────


Expand Down Expand Up @@ -213,6 +209,7 @@ def __init__(self, box: Box, **kwargs) -> None:
self._stripe_color: Color = _ZONE_COLORS["accent"]
self._last_tick = time.monotonic()
self._has_reading = False
self._pending: Box | None = None

# ── drawing ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -253,85 +250,83 @@ def _paint_overlap(self, draw, sx: int, sw: int, rx0: int, rx1: int, y0: int, y1
if wx0 < wx1:
draw.rectangle([wx0, y0, wx1 - 1, y1], fill=self._stripe_color)

# ── partial-column refresh ────────────────────────────────────────────────
# ── batched partial-column refresh ─────────────────────────────────────────

def _refresh_col(self, x: int, w: int) -> None:
"""Refresh a w-pixel-wide column at x (with wrap at _W), full widget height."""
if w <= 0:
return
def _flush_spans(self, spans: list[tuple[int, int]]) -> None:
"""Union (x, w) column spans (wrapping at _W) into self._pending.
tick() issues a single refresh() of the accumulated box each call."""
bx = self.box
if bx is None:
return
if x + w <= _W:
self.refresh(Box(x, bx.y0, x + w, bx.y1))
else:
right_w = _W - x
if right_w > 0:
self.refresh(Box(x, bx.y0, _W, bx.y1))
wrap_w = w - right_w
if wrap_w > 0:
self.refresh(Box(0, bx.y0, wrap_w, bx.y1))

def _refresh_stripes_at(self, phase_int: int) -> None:
for i in range(self.N_STRIPES):
sx = (phase_int + i * self.STRIPE_P) % _W
self._refresh_col(sx, self.STRIPE_W)
for x, w in spans:
if w <= 0:
continue
x %= _W
end = x + w
self._pending = (
Box(x, bx.y0, min(end, _W), bx.y1)
if self._pending is None
else self._pending.union(Box(x, bx.y0, min(end, _W), bx.y1))
)
if end > _W:
wrap = Box(0, bx.y0, end - _W, bx.y1)
self._pending = self._pending.union(wrap)

def _stripe_spans_at(self, phase_int: int) -> list[tuple[int, int]]:
return [((phase_int + i * self.STRIPE_P) % _W, self.STRIPE_W) for i in range(self.N_STRIPES)]

# ── tick ─────────────────────────────────────────────────────────────────

def tick(self, cents: float | None) -> None:
now = time.monotonic()
dt = min(now - self._last_tick, 0.5)
self._last_tick = now
self._pending = None

if cents is None:
if self._has_reading:
self._has_reading = False
self._zone = "accent"
self._stripe_color = _ZONE_COLORS["accent"]
self._refresh_stripes_at(int(self._phase))
return

if not self._has_reading:
self._flush_spans(self._stripe_spans_at(int(self._phase)))
elif not self._has_reading:
self._has_reading = True
self._refresh_stripes_at(int(self._phase))
return

new_zone: Zone = _zone(cents)
if new_zone != self._zone:
self._zone = new_zone
self._stripe_color = _zone_color(cents)
self._refresh_stripes_at(int(self._phase))
return

if self._zone == "in_tune":
return # frozen — zero SPI writes

K = (self.STRIPE_P / 50.0) * self.VELOCITY_SCALE
velocity = max(-50.0, min(50.0, cents)) * K
old_phase_int = int(self._phase)
self._phase = (self._phase + velocity * dt) % float(_W)
k = int(self._phase) - old_phase_int

if k == 0:
return

if abs(k) >= self.STRIPE_W:
self._refresh_stripes_at(old_phase_int)
self._refresh_stripes_at(int(self._phase))
return

ak = abs(k)
for i in range(self.N_STRIPES):
old_sx = (old_phase_int + i * self.STRIPE_P) % _W
if k > 0:
tail_x = old_sx
lead_x = (old_sx + self.STRIPE_W) % _W
else:
tail_x = (old_sx + self.STRIPE_W - ak) % _W
lead_x = (old_sx - ak) % _W
self._refresh_col(tail_x, ak)
self._refresh_col(lead_x, ak)
self._flush_spans(self._stripe_spans_at(int(self._phase)))
else:
new_zone: Zone = _zone(cents)
if new_zone != self._zone:
self._zone = new_zone
self._stripe_color = _ZONE_COLORS[new_zone]
self._flush_spans(self._stripe_spans_at(int(self._phase)))
elif self._zone != "in_tune":
K = (self.STRIPE_P / 50.0) * self.VELOCITY_SCALE
velocity = max(-50.0, min(50.0, cents)) * K
old_phase_int = int(self._phase)
self._phase = (self._phase + velocity * dt) % float(_W)
k = int(self._phase) - old_phase_int

if k != 0:
if abs(k) >= self.STRIPE_W:
self._flush_spans(
self._stripe_spans_at(old_phase_int) + self._stripe_spans_at(int(self._phase))
)
else:
ak = abs(k)
spans: list[tuple[int, int]] = []
for i in range(self.N_STRIPES):
old_sx = (old_phase_int + i * self.STRIPE_P) % _W
if k > 0:
tail_x = old_sx
lead_x = (old_sx + self.STRIPE_W) % _W
else:
tail_x = (old_sx + self.STRIPE_W - ak) % _W
lead_x = (old_sx - ak) % _W
spans.append((tail_x, ak))
spans.append((lead_x, ak))
self._flush_spans(spans)

if self._pending is not None:
self.refresh(self._pending)


# ── TunerPanel ───────────────────────────────────────────────────────────────
Expand Down
Loading