1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 11:12:09 +01:00

Compare commits

...

8 Commits

6 changed files with 178 additions and 156 deletions

Binary file not shown.

View File

@ -1,3 +1,11 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np
@ -62,7 +70,7 @@ def extract_waveform(series, period):
p = int(round(series.sample_rate * period))
n = len(series.samples) // p
if n <= 2:
return None, None
return None, None, None, None
samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p
@ -94,7 +102,7 @@ def normalize_waveform(samples, smooth=7):
crossings.append((i - last_rising, last_rising))
if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising))
width, first = min(crossings)
first = min(crossings)[1]
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
@ -104,7 +112,7 @@ def characterize_waveform(samples, crossings):
possibles = []
if len(crossings) == 1:
duty_cycle = crossings[0][1] / n
if duty_cycle > 0.45 and duty_cycle < 0.55:
if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))

128
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3
"""
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse
import array
@ -43,8 +52,8 @@ class Scope(vm.VirtualMachine):
break
else:
raise RuntimeError("No matching serial device found")
Log.info(f"Connecting to scope at {url}")
self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file')
if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -63,8 +72,8 @@ class Scope(vm.VirtualMachine):
await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501':
self.master_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate
self.primary_clock_rate = 40000000
self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024
@ -74,8 +83,8 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07
self.analog_hi_max = 0.88
self.timeout_clock_period = (1 << 8) * self.master_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period
self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else:
raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False
@ -115,7 +124,7 @@ class Scope(vm.VirtualMachine):
self.close()
def close(self):
super().close()
if super().close():
Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params):
@ -160,14 +169,16 @@ class Scope(vm.VirtualMachine):
logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples))
ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1):
clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks > capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else:
continue
@ -178,23 +189,23 @@ class Scope(vm.VirtualMachine):
else:
Log.debug("- mode too slow")
continue
n = int(round(period / self.master_clock_period / ticks / clock_scale))
actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2:
n -= n % 2
actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels:
buffer_width //= 2
if n <= buffer_width:
Log.debug(f"- OK; period is {n} samples")
nsamples = n
if actual_nsamples <= buffer_width:
Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = actual_nsamples
break
Log.debug(f"- insufficient buffer space for necessary {n} samples")
Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else:
raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period
sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6:
Log.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0
if raw:
@ -225,21 +236,13 @@ class Scope(vm.VirtualMachine):
if trigger_level is None:
trigger_level = (high + low) / 2
analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
if trigger == 'A' or trigger == 'B':
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
if isinstance(trigger, dict):
trigger_logic = 0
trigger_mask = 0xff
for channel, value in trigger.items():
if isinstance(channel, str):
if channel.startswith('L'):
channel = int(channel[1:])
channel = int(channel[1:]) # noqa
else:
raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7:
@ -248,6 +251,14 @@ class Scope(vm.VirtualMachine):
trigger_mask &= ~mask
if value:
trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
else:
raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower()
@ -263,12 +274,11 @@ class Scope(vm.VirtualMachine):
if timeout is None:
trigger_timeout = 0
else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0:
raise ConfigurationError("Required trigger timeout too long")
else:
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
@ -315,7 +325,7 @@ class Scope(vm.VirtualMachine):
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels),
@ -337,7 +347,7 @@ class Scope(vm.VirtualMachine):
mask = 1 << i
channel = f'L{i}'
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period,
@ -360,19 +370,19 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high:
raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size)))
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None
for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock
width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock
actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency:
Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break
error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]):
if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf
else:
if best_solution is None:
@ -422,9 +432,9 @@ class Scope(vm.VirtualMachine):
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks
actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction():
@ -495,11 +505,11 @@ class Scope(vm.VirtualMachine):
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n):
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero:
if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T
lo, hi, low, high, offset = np.array(items).T # noqa
def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
@ -514,7 +524,7 @@ class Scope(vm.VirtualMachine):
Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x):
def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
@ -536,23 +546,21 @@ class Scope(vm.VirtualMachine):
return f"<Scope {self.url}>"
"""
$ ipython3 --pylab
Using matplotlib backend: MacOSX
In [1]: run scope
In [2]: start_waveform(2000, 'triangle')
Out[2]: 2000.0
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [4]: plot(traces.A.timestamps, traces.A.samples)
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
In [5]: plot(traces.B.timestamps, traces.B.samples)
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
# $ ipython3 --pylab
# Using matplotlib backend: MacOSX
#
# In [1]: run scope
#
# In [2]: start_waveform(2000, 'triangle')
# Out[2]: 2000.0
#
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
#
# In [4]: plot(traces.A.timestamps, traces.A.samples)
# Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
#
# In [5]: plot(traces.B.timestamps, traces.B.samples)
# Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
async def main():

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO.
"""
# pylama:ignore=W1203,R0916,W0703
import asyncio
import logging
import sys
@ -20,14 +22,14 @@ Log = logging.getLogger(__name__)
class SerialStream:
@classmethod
def devices_matching(cls, vid=None, pid=None, serial=None):
def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number):
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device
@classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial):
def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device")
@ -59,15 +61,15 @@ class SerialStream:
return
if not self._output_buffer:
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
except serial.SerialTimeoutException:
n = 0
nbytes = 0
except Exception:
Log.exception("Error writing to stream")
raise
if n:
Log.debug(f"Write {data[:n]!r}")
self._output_buffer = data[n:]
if nbytes:
Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[nbytes:]
else:
self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self):
try:
n = self._connection.write(self._output_buffer)
nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException:
n = 0
except Exception as e:
nbytes = 0
except Exception as exc:
Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e)
self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection)
if n:
Log.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
if nbytes:
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer:
self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None)
@ -101,23 +103,22 @@ class SerialStream:
data = bytes(self._output_buffer)
self._output_buffer_lock.release()
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
finally:
self._output_buffer_lock.acquire()
Log.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None
async def read(self, n=None):
async def read(self, nbytes=None):
if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n)
return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True:
w = self._connection.in_waiting
if w:
data = self._connection.read(w if n is None else min(n, w))
nwaiting = self._connection.in_waiting
if nwaiting:
data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
Log.debug(f"Read {data!r}")
return data
else:
future = self._loop.create_future()
self._loop.add_reader(self._connection, future.set_result, None)
try:
@ -125,16 +126,16 @@ class SerialStream:
finally:
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None):
def _read_blocking(self, nbytes=None):
data = self._connection.read(1)
w = self._connection.in_waiting
if w and (n is None or n > 1):
data += self._connection.read(w if n is None else min(n-1, w))
nwaiting = self._connection.in_waiting
if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
Log.debug(f"Read {data!r}")
return data
async def readexactly(self, n):
async def readexactly(self, nbytes):
data = b''
while len(data) < n:
data += await self.read(n-len(data))
while len(data) < nbytes:
data += await self.read(nbytes-len(data))
return data

View File

@ -1,3 +1,9 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict):

99
vm.py
View File

@ -1,4 +1,3 @@
"""
vm
==
@ -7,12 +6,14 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B].
document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
"""
# pylama:ignore=E221,C0326,R0904,W1203
import array
from collections import namedtuple
from enum import IntEnum
@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else:
width = int(self.dtype[1:])
if sign == 'U':
n = 1 << width
value = max(0, min(value, n-1))
bs = struct.pack('<I', value)
max_value = (1 << width) - 1
value = min(max(0, value), max_value)
data = struct.pack('<I', value)
elif sign == 'S':
n = 1 << (width - 1)
value = max(-n, min(value, n-1))
bs = struct.pack('<i', value)
max_value = (1 << (width - 1))
value = min(max(-max_value, value), max_value - 1)
data = struct.pack('<i', value)
else:
raise TypeError("Unrecognised dtype")
return bs[:width//8]
return data[:width//8]
def decode(self, bs):
if len(bs) < 4:
bs = bs + bytes(4 - len(bs))
def decode(self, data):
if len(data) < 4:
data = data + bytes(4 - len(data))
sign = self.dtype[0]
if sign == 'U':
value = struct.unpack('<I', bs)[0]
value = struct.unpack('<I', data)[0]
elif sign == 'S':
value = struct.unpack('<i', bs)[0]
value = struct.unpack('<i', data)[0]
else:
raise TypeError("Unrecognised dtype")
if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1))
value = value / (1 << fraction)
fraction = int(self.dtype.split('.', 1)[1])
value /= (1 << fraction)
return value
@property
@ -69,14 +70,13 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S':
whole -= 1
n = (1 << (whole+fraction)) - 1
return n / (1 << fraction) if fraction else n
max_value = (1 << (whole+fraction)) - 1
return max_value / (1 << fraction) if fraction else max_value
@property
def width(self):
if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else:
return int(self.dtype[1:]) // 8
@ -90,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"),
"ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -151,13 +151,11 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
"PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
})
# pylama:ignore=E221
class TraceMode(IntEnum):
Analog = 0
Mixed = 1
@ -254,12 +252,12 @@ class VirtualMachine:
class Transaction:
def __init__(self, vm):
self._vm = vm
self._data = b''
def append(self, cmd):
self._data += cmd
async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self)
return self
@ -281,6 +279,8 @@ class VirtualMachine:
self._writer.close()
self._writer = None
self._reader = None
return True
return False
__del__ = close
@ -300,12 +300,12 @@ class VirtualMachine:
else:
self._transactions[-1].append(cmd)
async def read_replies(self, n):
async def read_replies(self, nreplies):
if self._transactions:
raise TypeError("Command transaction in progress")
replies = []
data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n:
while len(replies) < nreplies:
index = data.find(b'\r')
if index >= 0:
reply = data[:index]
@ -331,25 +331,25 @@ class VirtualMachine:
async def set_registers(self, **kwargs):
cmd = ''
r0 = r1 = None
register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name]
bs = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
for i, byte in enumerate(bs):
data = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(data):
if cmd:
cmd += 'z'
r1 += 1
register1 += 1
address = base + i
if r1 is None or address > r1 + 3:
if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@'
r0 = r1 = address
register0 = register1 = address
else:
cmd += 'n' * (address - r1)
r1 = address
if byte != r0:
cmd += 'n' * (address - register1)
register1 = address
if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte
register0 = byte
if cmd:
await self.issue(cmd + 's')
@ -382,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self):
await self.issue(b'D')
async def read_analog_samples(self, n, sample_width):
async def read_analog_samples(self, nsamples, sample_width):
if self._transactions:
raise TypeError("Command transaction in progress")
if sample_width == 2:
data = await self._reader.readexactly(2*n)
data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1:
data = await self._reader.readexactly(n)
if sample_width == 1:
data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data))
else:
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n):
async def read_logic_samples(self, nsamples):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self):
await self.issue(b'K')
@ -411,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self):
await self.issue(b'R')
async def wavetable_read_bytes(self, n):
async def wavetable_read_bytes(self, nbytes):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs):
async def wavetable_write_bytes(self, data):
cmd = ''
last_byte = None
for byte in bs:
for byte in data:
if byte != last_byte:
cmd += f'{byte:02x}'
cmd += 'W'