1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 03:02:09 +01:00

Compare commits

...

10 Commits

8 changed files with 236 additions and 172 deletions

View File

@ -8,3 +8,58 @@
- Also requires **NumPy** and **SciPy** if you want to do analog range calibration
- Having **Pandas** is useful for wrapping capture results for further processing
## Longer Notes
### Why have I written this?
BitScope helpfully provide applications and libraries for talking to their USB
capture devices, so one can just use those. I wrote this code because I want
to be able to grab the raw data and further process it in various ways. I'm
accustomed to working at a Python prompt with various toolkits like SciPy and
Matplotlib, so I wanted a simple way to just grab a trace as an array.
The BitScope library is pretty simple to use, but also requires you to
understand a fair amount about how the scope works and make a bunch of decisions
about what capture mode and rate to use. I want to just specify a time period,
voltage range and a rough number of samples and have all of that worked out for
me, same way as I'd use an actual oscilloscope: twiddle the knobs and look at
the trace.
Of course, I could have wrapped the BitScope library in something that would do
this, but after reading a bit about how the scope works I was fascinated with
understanding it further and so I decided to go back to first principles and
start with just talking to it with a serial library. This code thus serves as
(sort of) documentation for the VM registers, capture modes and how to use them.
It also has the advantage of being pure Python.
The code prefers the highest capture resolution possible and will do the mapping
from high/low/trigger voltages to the mysterious magic numbers that the device
needs. It can also do logic and mixed-signal capture.
In addition to capturing, the code can also generate waveforms at arbitrary
frequencies  something that is tricky to do as the device operates at specific
frequencies and so one has to massage the width of the waveform buffer to get a
frequency outside of these. It can also control the clock generator.
I've gone for an underlying async design as it makes it easy to integrate the
code into UI programs or network servers  both of which interest me as the end
purpose for this code. However, for shell use there are synchronous wrapper
functions. Of particular note is that the synchronous wrapper understands
keyboard interrupt and will cancel a capture returning the trace around the
cancel point. This is useful if your trigger doesn't fire and you want to
understand why.
### Where's the documentation, mate?
Yeah, yeah. I know.
### Also, I see no unit tests...
It's pretty hard to do unit tests for a physical device. That's my excuse and
I'm sticking to it.
### Long lines and ignoring E221, eh?
"A foolish consistency is the hobgoblin of little minds"
Also, I haven't used an 80 character wide terminal in this century.

Binary file not shown.

View File

@ -1,3 +1,11 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np
@ -62,7 +70,7 @@ def extract_waveform(series, period):
p = int(round(series.sample_rate * period))
n = len(series.samples) // p
if n <= 2:
return None, None
return None, None, None, None
samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p
@ -94,7 +102,7 @@ def normalize_waveform(samples, smooth=7):
crossings.append((i - last_rising, last_rising))
if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising))
width, first = min(crossings)
first = min(crossings)[1]
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
@ -104,7 +112,7 @@ def characterize_waveform(samples, crossings):
possibles = []
if len(crossings) == 1:
duty_cycle = crossings[0][1] / n
if duty_cycle > 0.45 and duty_cycle < 0.55:
if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))

134
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3
"""
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse
import array
@ -43,8 +52,8 @@ class Scope(vm.VirtualMachine):
break
else:
raise RuntimeError("No matching serial device found")
Log.info(f"Connecting to scope at {url}")
self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file')
if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -63,8 +72,8 @@ class Scope(vm.VirtualMachine):
await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501':
self.master_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate
self.primary_clock_rate = 40000000
self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024
@ -74,8 +83,8 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07
self.analog_hi_max = 0.88
self.timeout_clock_period = (1 << 8) * self.master_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period
self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else:
raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False
@ -115,8 +124,8 @@ class Scope(vm.VirtualMachine):
self.close()
def close(self):
super().close()
Log.info("Closed scope")
if super().close():
Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams):
@ -160,14 +169,16 @@ class Scope(vm.VirtualMachine):
logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples))
ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1):
clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks > capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else:
continue
@ -178,23 +189,23 @@ class Scope(vm.VirtualMachine):
else:
Log.debug("- mode too slow")
continue
n = int(round(period / self.master_clock_period / ticks / clock_scale))
actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2:
n -= n % 2
actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels:
buffer_width //= 2
if n <= buffer_width:
Log.debug(f"- OK; period is {n} samples")
nsamples = n
if actual_nsamples <= buffer_width:
Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = actual_nsamples
break
Log.debug(f"- insufficient buffer space for necessary {n} samples")
Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else:
raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period
sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6:
Log.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0
if raw:
@ -225,21 +236,13 @@ class Scope(vm.VirtualMachine):
if trigger_level is None:
trigger_level = (high + low) / 2
analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
if trigger == 'A' or trigger == 'B':
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
if isinstance(trigger, dict):
trigger_logic = 0
trigger_mask = 0xff
for channel, value in trigger.items():
if isinstance(channel, str):
if channel.startswith('L'):
channel = int(channel[1:])
channel = int(channel[1:]) # noqa
else:
raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7:
@ -248,6 +251,14 @@ class Scope(vm.VirtualMachine):
trigger_mask &= ~mask
if value:
trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
else:
raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower()
@ -263,13 +274,12 @@ class Scope(vm.VirtualMachine):
if timeout is None:
trigger_timeout = 0
else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0:
raise ConfigurationError("Required trigger timeout too long")
else:
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
@ -315,7 +325,7 @@ class Scope(vm.VirtualMachine):
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels),
@ -337,7 +347,7 @@ class Scope(vm.VirtualMachine):
mask = 1 << i
channel = f'L{i}'
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period,
@ -347,7 +357,7 @@ class Scope(vm.VirtualMachine):
series.trigger_timestamp = series.timestamps[trigger_samples]
series.trigger_level = trigger[i]
series.trigger_type = trigger_type
traces[channel] = series
traces[channel] = series
Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
return traces
@ -360,19 +370,19 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high:
raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size)))
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None
for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock
width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock
actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency:
Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break
error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]):
if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf
else:
if best_solution is None:
@ -422,9 +432,9 @@ class Scope(vm.VirtualMachine):
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks
actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction():
@ -495,11 +505,11 @@ class Scope(vm.VirtualMachine):
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n):
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero:
if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T
lo, hi, low, high, offset = np.array(items).T # noqa
def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
@ -514,7 +524,7 @@ class Scope(vm.VirtualMachine):
Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x):
def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
@ -536,23 +546,21 @@ class Scope(vm.VirtualMachine):
return f"<Scope {self.url}>"
"""
$ ipython3 --pylab
Using matplotlib backend: MacOSX
In [1]: run scope
In [2]: start_waveform(2000, 'triangle')
Out[2]: 2000.0
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [4]: plot(traces.A.timestamps, traces.A.samples)
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
In [5]: plot(traces.B.timestamps, traces.B.samples)
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
# $ ipython3 --pylab
# Using matplotlib backend: MacOSX
#
# In [1]: run scope
#
# In [2]: start_waveform(2000, 'triangle')
# Out[2]: 2000.0
#
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
#
# In [4]: plot(traces.A.timestamps, traces.A.samples)
# Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
#
# In [5]: plot(traces.B.timestamps, traces.B.samples)
# Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
async def main():

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO.
"""
# pylama:ignore=W1203,R0916,W0703
import asyncio
import logging
import sys
@ -20,14 +22,14 @@ Log = logging.getLogger(__name__)
class SerialStream:
@classmethod
def devices_matching(cls, vid=None, pid=None, serial=None):
def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number):
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device
@classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial):
def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device")
@ -59,15 +61,15 @@ class SerialStream:
return
if not self._output_buffer:
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
except serial.SerialTimeoutException:
n = 0
nbytes = 0
except Exception:
Log.exception("Error writing to stream")
raise
if n:
Log.debug(f"Write {data[:n]!r}")
self._output_buffer = data[n:]
if nbytes:
Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[nbytes:]
else:
self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self):
try:
n = self._connection.write(self._output_buffer)
nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException:
n = 0
except Exception as e:
nbytes = 0
except Exception as exc:
Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e)
self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection)
if n:
Log.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
if nbytes:
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer:
self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None)
@ -101,40 +103,39 @@ class SerialStream:
data = bytes(self._output_buffer)
self._output_buffer_lock.release()
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
finally:
self._output_buffer_lock.acquire()
Log.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None
async def read(self, n=None):
async def read(self, nbytes=None):
if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n)
return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True:
w = self._connection.in_waiting
if w:
data = self._connection.read(w if n is None else min(n, w))
nwaiting = self._connection.in_waiting
if nwaiting:
data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
Log.debug(f"Read {data!r}")
return data
else:
future = self._loop.create_future()
self._loop.add_reader(self._connection, future.set_result, None)
try:
await future
finally:
self._loop.remove_reader(self._connection)
future = self._loop.create_future()
self._loop.add_reader(self._connection, future.set_result, None)
try:
await future
finally:
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None):
def _read_blocking(self, nbytes=None):
data = self._connection.read(1)
w = self._connection.in_waiting
if w and (n is None or n > 1):
data += self._connection.read(w if n is None else min(n-1, w))
nwaiting = self._connection.in_waiting
if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
Log.debug(f"Read {data!r}")
return data
async def readexactly(self, n):
async def readexactly(self, nbytes):
data = b''
while len(data) < n:
data += await self.read(n-len(data))
while len(data) < nbytes:
data += await self.read(nbytes-len(data))
return data

19
test.py
View File

@ -1,26 +1,12 @@
import numpy as np
from pylab import figure, plot, show
from analysis import annotate_series
from scope import await_, capture, main
from utils import DotDict
await_(main())
# o = 400
# m = 5
# n = o * m
# samples = square_wave(o)
# samples = np.hstack([samples] * m) * 2
# samples = np.hstack([samples[100:], samples[:100]])
# samples += np.random.normal(size=n) * 0.1
# samples += np.linspace(4.5, 5.5, n)
# series = DotDict(samples=samples, sample_rate=1000000)
data = capture(['A'], period=20e-3, nsamples=2000)
series = data.A
series = capture(['A'], period=20e-3, nsamples=2000).A
figure(1)
plot(series.timestamps, series.samples)
@ -35,6 +21,7 @@ if annotate_series(series):
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with amplitude ±{waveform.amplitude:.2f}V and offset {waveform.offset:.2f}V")
plot(waveform.timestamps + waveform.capture_start - series.capture_start, waveform.samples * waveform.amplitude + waveform.offset)
plot(waveform.timestamps + waveform.capture_start - series.capture_start,
waveform.samples * waveform.amplitude + waveform.offset)
show()

View File

@ -1,3 +1,9 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict):

103
vm.py
View File

@ -1,4 +1,3 @@
"""
vm
==
@ -7,12 +6,14 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B].
document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
"""
# pylama:ignore=E221,C0326,R0904,W1203
import array
from collections import namedtuple
from enum import IntEnum
@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else:
width = int(self.dtype[1:])
if sign == 'U':
n = 1 << width
value = max(0, min(value, n-1))
bs = struct.pack('<I', value)
max_value = (1 << width) - 1
value = min(max(0, value), max_value)
data = struct.pack('<I', value)
elif sign == 'S':
n = 1 << (width - 1)
value = max(-n, min(value, n-1))
bs = struct.pack('<i', value)
max_value = (1 << (width - 1))
value = min(max(-max_value, value), max_value - 1)
data = struct.pack('<i', value)
else:
raise TypeError("Unrecognised dtype")
return bs[:width//8]
return data[:width//8]
def decode(self, bs):
if len(bs) < 4:
bs = bs + bytes(4 - len(bs))
def decode(self, data):
if len(data) < 4:
data = data + bytes(4 - len(data))
sign = self.dtype[0]
if sign == 'U':
value = struct.unpack('<I', bs)[0]
value = struct.unpack('<I', data)[0]
elif sign == 'S':
value = struct.unpack('<i', bs)[0]
value = struct.unpack('<i', data)[0]
else:
raise TypeError("Unrecognised dtype")
if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1))
value = value / (1 << fraction)
fraction = int(self.dtype.split('.', 1)[1])
value /= (1 << fraction)
return value
@property
@ -69,15 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S':
whole -= 1
n = (1 << (whole+fraction)) - 1
return n / (1 << fraction) if fraction else n
max_value = (1 << (whole+fraction)) - 1
return max_value / (1 << fraction) if fraction else max_value
@property
def width(self):
if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else:
return int(self.dtype[1:]) // 8
return int(self.dtype[1:]) // 8
Registers = DotDict({
@ -90,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"),
"ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -151,13 +151,11 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
"PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
})
# pylama:ignore=E221
class TraceMode(IntEnum):
Analog = 0
Mixed = 1
@ -254,12 +252,12 @@ class VirtualMachine:
class Transaction:
def __init__(self, vm):
self._vm = vm
self._data = b''
def append(self, cmd):
self._data += cmd
async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self)
return self
@ -281,6 +279,8 @@ class VirtualMachine:
self._writer.close()
self._writer = None
self._reader = None
return True
return False
__del__ = close
@ -300,12 +300,12 @@ class VirtualMachine:
else:
self._transactions[-1].append(cmd)
async def read_replies(self, n):
async def read_replies(self, nreplies):
if self._transactions:
raise TypeError("Command transaction in progress")
replies = []
data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n:
while len(replies) < nreplies:
index = data.find(b'\r')
if index >= 0:
reply = data[:index]
@ -331,25 +331,25 @@ class VirtualMachine:
async def set_registers(self, **kwargs):
cmd = ''
r0 = r1 = None
register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name]
bs = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
for i, byte in enumerate(bs):
data = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(data):
if cmd:
cmd += 'z'
r1 += 1
register1 += 1
address = base + i
if r1 is None or address > r1 + 3:
if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@'
r0 = r1 = address
register0 = register1 = address
else:
cmd += 'n' * (address - r1)
r1 = address
if byte != r0:
cmd += 'n' * (address - register1)
register1 = address
if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte
register0 = byte
if cmd:
await self.issue(cmd + 's')
@ -382,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self):
await self.issue(b'D')
async def read_analog_samples(self, n, sample_width):
async def read_analog_samples(self, nsamples, sample_width):
if self._transactions:
raise TypeError("Command transaction in progress")
if sample_width == 2:
data = await self._reader.readexactly(2*n)
data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1:
data = await self._reader.readexactly(n)
if sample_width == 1:
data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data))
else:
raise ValueError(f"Bad sample width: {sample_width}")
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n):
async def read_logic_samples(self, nsamples):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self):
await self.issue(b'K')
@ -411,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self):
await self.issue(b'R')
async def wavetable_read_bytes(self, n):
async def wavetable_read_bytes(self, nbytes):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs):
async def wavetable_write_bytes(self, data):
cmd = ''
last_byte = None
for byte in bs:
for byte in data:
if byte != last_byte:
cmd += f'{byte:02x}'
cmd += 'W'