1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 11:12:09 +01:00

Compare commits

..

16 Commits

Author SHA1 Message Date
7e2b262429 Correct two howling errors in the capture logic – how did I not notice these before? 2021-06-22 13:20:06 +01:00
58aaf4c74e Merge pull request #1 from mh-/main
Fixed small problem when serial_number is not specified.
2021-02-20 11:24:12 +00:00
759828c637 Modified streams.py so that it works also with unspecified serial_number. 2021-02-14 16:16:23 +01:00
f3748a4c6a More language tweaks. 2020-07-29 18:40:02 +01:00
81eba8bc5c Fixes to clock scale selection for slow capture. 2020-07-29 18:39:46 +01:00
188db8bd76 More overdue fixes. The more I look, the more I see this shit everywhere in my code... 2020-07-24 13:49:10 +01:00
78f99c4acc Lots of linter changes. 2020-07-23 15:37:40 +01:00
829eab570a Small change to closing of scope 2020-07-23 12:38:22 +01:00
742109336b Mysterious invisible character 2020-07-20 14:00:52 +01:00
42bb240cc4 Extend README as people seem to be suddenly looking at this code; remove cruft from analysis test script. 2020-07-20 13:58:10 +01:00
b62bffe631 Analysis tweaks; split out test code for the moment 2020-07-01 19:32:06 +01:00
529a96a31b Linter fixes 2020-07-01 19:31:31 +01:00
22f7d1a81e Use timestamps starting at 0 and record scope time as capture_start in the series data 2020-07-01 19:31:18 +01:00
ca9011011d Small tweaks 2020-06-29 19:28:53 +01:00
89e0f23499 Oops! Missed one. More linter cleaning-up 2020-06-29 17:31:07 +01:00
c5222fd9b4 Tidying up a bit; work in progress on automatic analysis of results 2020-06-29 17:25:43 +01:00
8 changed files with 489 additions and 217 deletions

View File

@ -8,3 +8,58 @@
- Also requires **NumPy** and **SciPy** if you want to do analog range calibration - Also requires **NumPy** and **SciPy** if you want to do analog range calibration
- Having **Pandas** is useful for wrapping capture results for further processing - Having **Pandas** is useful for wrapping capture results for further processing
## Longer Notes
### Why have I written this?
BitScope helpfully provide applications and libraries for talking to their USB
capture devices, so one can just use those. I wrote this code because I want
to be able to grab the raw data and further process it in various ways. I'm
accustomed to working at a Python prompt with various toolkits like SciPy and
Matplotlib, so I wanted a simple way to just grab a trace as an array.
The BitScope library is pretty simple to use, but also requires you to
understand a fair amount about how the scope works and make a bunch of decisions
about what capture mode and rate to use. I want to just specify a time period,
voltage range and a rough number of samples and have all of that worked out for
me, same way as I'd use an actual oscilloscope: twiddle the knobs and look at
the trace.
Of course, I could have wrapped the BitScope library in something that would do
this, but after reading a bit about how the scope works I was fascinated with
understanding it further and so I decided to go back to first principles and
start with just talking to it with a serial library. This code thus serves as
(sort of) documentation for the VM registers, capture modes and how to use them.
It also has the advantage of being pure Python.
The code prefers the highest capture resolution possible and will do the mapping
from high/low/trigger voltages to the mysterious magic numbers that the device
needs. It can also do logic and mixed-signal capture.
In addition to capturing, the code can also generate waveforms at arbitrary
frequencies  something that is tricky to do as the device operates at specific
frequencies and so one has to massage the width of the waveform buffer to get a
frequency outside of these. It can also control the clock generator.
I've gone for an underlying async design as it makes it easy to integrate the
code into UI programs or network servers  both of which interest me as the end
purpose for this code. However, for shell use there are synchronous wrapper
functions. Of particular note is that the synchronous wrapper understands
keyboard interrupt and will cancel a capture returning the trace around the
cancel point. This is useful if your trigger doesn't fire and you want to
understand why.
### Where's the documentation, mate?
Yeah, yeah. I know.
### Also, I see no unit tests...
It's pretty hard to do unit tests for a physical device. That's my excuse and
I'm sticking to it.
### Long lines and ignoring E221, eh?
"A foolish consistency is the hobgoblin of little minds"
Also, I haven't used an 80 character wide terminal in this century.

Binary file not shown.

150
analysis.py Normal file
View File

@ -0,0 +1,150 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np
from utils import DotDict
def interpolate_min_x(f, x):
return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
def rms(f):
return np.sqrt((f ** 2).mean())
def sine_wave(n):
return np.sin(np.linspace(0, 2*np.pi, n, endpoint=False))
def triangle_wave(n):
x = np.linspace(0, 4, n, endpoint=False)
x2 = x % 2
y = np.where(x2 < 1, x2, 2 - x2)
y = np.where(x // 2 < 1, y, -y)
return y
def square_wave(n, duty=0.5):
w = int(n * duty)
return np.hstack([np.ones(w), -np.ones(n - w)])
def sawtooth_wave(n):
return 2 * (np.linspace(0.5, 1.5, n, endpoint=False) % 1) - 1
def moving_average(samples, width, mode='wrap'):
hwidth = width // 2
samples = np.take(samples, np.arange(-hwidth, len(samples)+width-hwidth), mode=mode)
cumulative = samples.cumsum()
return (cumulative[width:] - cumulative[:-width]) / width
def calculate_periodicity(series, window=0.1):
samples = np.array(series.samples, dtype='double')
window = int(len(samples) * window)
errors = np.zeros(len(samples) - window)
for i in range(1, len(errors) + 1):
errors[i-1] = rms(samples[i:] - samples[:-i])
threshold = errors.max() / 2
minima = []
for i in range(window, len(errors) - window):
p = errors[i-window:i+window].argmin()
if p == window and errors[p + i - window] < threshold:
minima.append(interpolate_min_x(errors, i))
if len(minima) <= 1:
return None
ks = np.polyfit(np.arange(0, len(minima)), minima, 1)
return ks[0] / series.sample_rate
def extract_waveform(series, period):
p = int(round(series.sample_rate * period))
n = len(series.samples) // p
if n <= 2:
return None, None, None, None
samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p
n -= 1
samples = samples[p//2:p*n + p//2] - underlying
wave = np.zeros(p)
for i in range(n):
o = i * p
wave += samples[o:o+p]
wave /= n
return wave, p//2, n, underlying
def normalize_waveform(samples, smooth=7):
n = len(samples)
smoothed = moving_average(samples, smooth)
scale = (smoothed.max() - smoothed.min()) / 2
offset = (smoothed.max() + smoothed.min()) / 2
smoothed -= offset
last_rising = first_falling = None
crossings = []
for i in range(n):
if smoothed[i-1] < 0 and smoothed[i] > 0:
last_rising = i
elif smoothed[i-1] > 0 and smoothed[i] < 0:
if last_rising is None:
first_falling = i
else:
crossings.append((i - last_rising, last_rising))
if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising))
first = min(crossings)[1]
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
def characterize_waveform(samples, crossings):
n = len(samples)
possibles = []
if len(crossings) == 1:
duty_cycle = crossings[0][1] / n
if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))
possibles.append((rms(samples - square_wave(n, duty_cycle)), 'square', duty_cycle))
possibles.sort()
return possibles
def annotate_series(series):
period = calculate_periodicity(series)
if period is not None:
waveform = DotDict(period=period, frequency=1 / period)
wave, start, count, underlying = extract_waveform(series, period)
wave, offset, scale, first, crossings = normalize_waveform(wave)
waveform.samples = wave
waveform.beginning = start + first
waveform.count = count
waveform.amplitude = scale
waveform.offset = underlying.mean() + offset
waveform.timestamps = np.arange(len(wave)) * series.sample_period
waveform.sample_period = series.sample_period
waveform.sample_rate = series.sample_rate
waveform.capture_start = series.capture_start + waveform.beginning * series.sample_period
possibles = characterize_waveform(wave, crossings)
if possibles:
error, shape, duty_cycle = possibles[0]
waveform.error = error
waveform.shape = shape
if duty_cycle is not None:
waveform.duty_cycle = duty_cycle
else:
waveform.shape = 'unknown'
series.waveform = waveform
return True
return False

232
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3 """
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse import argparse
import array import array
@ -16,9 +25,8 @@ from utils import DotDict
import vm import vm
LOG = logging.getLogger(__name__) Log = logging.getLogger(__name__)
AnalogParametersPath = Path('~/.config/scopething/analog.conf').expanduser()
ANALOG_PARAMETERS_PATH = Path('~/.config/scopething/analog.conf').expanduser()
class UsageError(Exception): class UsageError(Exception):
@ -44,8 +52,8 @@ class Scope(vm.VirtualMachine):
break break
else: else:
raise RuntimeError("No matching serial device found") raise RuntimeError("No matching serial device found")
LOG.info(f"Connecting to scope at {url}")
self.close() self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file') parts = urlparse(url, scheme='file')
if parts.scheme == 'file': if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path) self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -59,13 +67,13 @@ class Scope(vm.VirtualMachine):
return self return self
async def reset(self): async def reset(self):
LOG.info("Resetting scope") Log.info("Resetting scope")
await self.issue_reset() await self.issue_reset()
await self.issue_get_revision() await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii') revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501': if revision == 'BS000501':
self.master_clock_rate = 40000000 self.primary_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10 self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024 self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024 self.awg_sample_buffer_size = 1024
@ -75,38 +83,38 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)} self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07 self.analog_lo_min = 0.07
self.analog_hi_max = 0.88 self.analog_hi_max = 0.88
self.timeout_clock_period = (1 << 8) * self.master_clock_period self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else: else:
raise RuntimeError(f"Unsupported scope, revision: {revision}") raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False self._awg_running = False
self._clock_running = False self._clock_running = False
self.load_analog_params() self.load_analog_params()
LOG.info(f"Initialised scope, revision: {revision}") Log.info(f"Initialised scope, revision: {revision}")
def load_analog_params(self): def load_analog_params(self):
config = ConfigParser() config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH) config.read(AnalogParametersPath)
analog_params = {} analog_params = {}
for url in config.sections(): for url in config.sections():
if url == self.url: if url == self.url:
for probes in config[url]: for probes in config[url]:
params = self.AnalogParams(*map(float, config[url][probes].split())) params = self.AnalogParams(*map(float, config[url][probes].split()))
analog_params[probes] = params analog_params[probes] = params
LOG.debug(f"Loading saved parameters for {probes}: {params!r}") Log.debug(f"Loading saved parameters for {probes}: {params!r}")
if analog_params: if analog_params:
self.analog_params.update(analog_params) self.analog_params.update(analog_params)
LOG.info(f"Loaded analog parameters for probes: {', '.join(analog_params.keys())}") Log.info(f"Loaded analog parameters for probes: {', '.join(analog_params.keys())}")
def save_analog_params(self): def save_analog_params(self):
LOG.info("Saving analog parameters") Log.info("Saving analog parameters")
config = ConfigParser() config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH) config.read(AnalogParametersPath)
config[self.url] = {probes: ' '.join(map(str, self.analog_params[probes])) for probes in self.analog_params} config[self.url] = {probes: ' '.join(map(str, self.analog_params[probes])) for probes in self.analog_params}
parent = ANALOG_PARAMETERS_PATH.parent parent = AnalogParametersPath.parent
if not parent.is_dir(): if not parent.is_dir():
parent.mkdir(parents=True) parent.mkdir(parents=True)
with open(ANALOG_PARAMETERS_PATH, 'w') as parameters_file: with open(AnalogParametersPath, 'w') as parameters_file:
config.write(parameters_file) config.write(parameters_file)
def __enter__(self): def __enter__(self):
@ -116,8 +124,8 @@ class Scope(vm.VirtualMachine):
self.close() self.close()
def close(self): def close(self):
super().close() if super().close():
LOG.info("Closed scope") Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params): def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams): if not isinstance(params, self.AnalogParams):
@ -161,41 +169,43 @@ class Scope(vm.VirtualMachine):
logic_enable = sum(1 << channel for channel in logic_channels) logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes: for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples)) ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1 clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels): if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
LOG.debug(f"Considering trace mode {capture_mode.trace_mode.name}...") Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1: if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high)) clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale)) ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1): if ticks > capture_mode.clock_low:
LOG.debug(f"- try with tick count {ticks} x {clock_scale}") if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else: else:
continue continue
elif ticks >= capture_mode.clock_low: elif ticks >= capture_mode.clock_low:
if ticks > capture_mode.clock_high: if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high ticks = capture_mode.clock_high
LOG.debug(f"- try with tick count {ticks}") Log.debug(f"- try with tick count {ticks}")
else: else:
LOG.debug("- mode too slow") Log.debug("- mode too slow")
continue continue
n = int(round(period / self.master_clock_period / ticks / clock_scale)) actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2: if len(analog_channels) == 2:
n -= n % 2 actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels: if logic_channels and analog_channels:
buffer_width //= 2 buffer_width //= 2
if n <= buffer_width: if actual_nsamples <= buffer_width:
LOG.debug(f"- OK; period is {n} samples") Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = n nsamples = actual_nsamples
break break
LOG.debug(f"- insufficient buffer space for necessary {n} samples") Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else: else:
raise ConfigurationError("Unable to find appropriate capture mode") raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6: if trigger_position and sample_rate > 5e6:
LOG.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0") Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0 trigger_position = 0
if raw: if raw:
@ -206,11 +216,11 @@ class Scope(vm.VirtualMachine):
if low is None: if low is None:
low = analog_params.safe_low if analog_channels else self.logic_low low = analog_params.safe_low if analog_channels else self.logic_low
elif low < analog_params.safe_low: elif low < analog_params.safe_low:
LOG.warning(f"Voltage range is below safe minimum: {low} < {analog_params.safe_low}") Log.warning(f"Voltage range is below safe minimum: {low} < {analog_params.safe_low}")
if high is None: if high is None:
high = analog_params.safe_high if analog_channels else self.logic_high high = analog_params.safe_high if analog_channels else self.logic_high
elif high > analog_params.safe_high: elif high > analog_params.safe_high:
LOG.warning(f"Voltage range is above safe maximum: {high} > {analog_params.safe_high}") Log.warning(f"Voltage range is above safe maximum: {high} > {analog_params.safe_high}")
lo, hi = self.calculate_lo_hi(low, high, analog_params) lo, hi = self.calculate_lo_hi(low, high, analog_params)
spock_option = vm.SpockOption.TriggerTypeHardwareComparator spock_option = vm.SpockOption.TriggerTypeHardwareComparator
@ -225,23 +235,14 @@ class Scope(vm.VirtualMachine):
kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable
if trigger_level is None: if trigger_level is None:
trigger_level = (high + low) / 2 trigger_level = (high + low) / 2
if not raw: analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if isinstance(trigger, dict):
if trigger == 'A' or trigger == 'B':
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
trigger_logic = 0 trigger_logic = 0
trigger_mask = 0xff trigger_mask = 0xff
for channel, value in trigger.items(): for channel, value in trigger.items():
if isinstance(channel, str): if isinstance(channel, str):
if channel.startswith('L'): if channel.startswith('L'):
channel = int(channel[1:]) channel = int(channel[1:]) # noqa
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7: if channel < 0 or channel > 7:
@ -250,6 +251,14 @@ class Scope(vm.VirtualMachine):
trigger_mask &= ~mask trigger_mask &= ~mask
if value: if value:
trigger_logic |= mask trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower() trigger_type = trigger_type.lower()
@ -265,20 +274,19 @@ class Scope(vm.VirtualMachine):
if timeout is None: if timeout is None:
trigger_timeout = 0 trigger_timeout = 0
else: else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period)) + timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value: if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0: if timeout > 0:
raise ConfigurationError("Required trigger timeout too long") raise ConfigurationError("Required trigger timeout too long")
else: raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
LOG.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture " Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})") f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
async with self.transaction(): async with self.transaction():
await self.set_registers(TraceMode=capture_mode.trace_mode, BufferMode=capture_mode.buffer_mode, await self.set_registers(TraceMode=capture_mode.trace_mode, BufferMode=capture_mode.buffer_mode,
SampleAddress=0, ClockTicks=ticks, ClockScale=clock_scale, SampleAddress=0, ClockTicks=ticks, ClockScale=clock_scale,
TriggerLevel=trigger_level, TriggerLogic=trigger_logic, TriggerMask=trigger_mask, TriggerLevel=analog_trigger_level, TriggerLogic=trigger_logic, TriggerMask=trigger_mask,
TraceIntro=trace_intro, TraceOutro=trace_outro, TraceDelay=0, Timeout=trigger_timeout, TraceIntro=trace_intro, TraceOutro=trace_outro, TraceDelay=0, Timeout=trigger_timeout,
TriggerIntro=trigger_intro//2, TriggerOutro=trigger_outro//2, Prelude=0, TriggerIntro=trigger_intro//2, TriggerOutro=trigger_outro//2, Prelude=0,
SpockOption=spock_option, ConverterLo=lo, ConverterHi=hi, SpockOption=spock_option, ConverterLo=lo, ConverterHi=hi,
@ -304,7 +312,8 @@ class Scope(vm.VirtualMachine):
address -= address % 2 address -= address % 2
traces = DotDict() traces = DotDict()
timestamps = array.array('d', (t*self.master_clock_period for t in range(start_timestamp, timestamp, ticks*clock_scale)))
timestamps = array.array('d', (i * sample_period for i in range(nsamples)))
for dump_channel, channel in enumerate(sorted(analog_channels)): for dump_channel, channel in enumerate(sorted(analog_channels)):
asamples = nsamples // len(analog_channels) asamples = nsamples // len(analog_channels)
async with self.transaction(): async with self.transaction():
@ -315,11 +324,18 @@ class Scope(vm.VirtualMachine):
await self.issue_analog_dump_binary() await self.issue_analog_dump_binary()
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1)) value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width) data = await self.read_analog_samples(asamples, capture_mode.sample_width)
traces[channel] = DotDict({'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps, series = DotDict({'channel': channel,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)), 'capture_start': start_timestamp * self.primary_clock_period,
'sample_period': sample_period*len(analog_channels), 'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'sample_rate': sample_rate/len(analog_channels), 'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'cause': cause}) 'sample_period': sample_period*len(analog_channels),
'sample_rate': sample_rate/len(analog_channels),
'cause': cause})
if cause == 'trigger' and channel == trigger:
series.trigger_timestamp = series.timestamps[trigger_samples // len(analog_channels)]
series.trigger_level = trigger_level
series.trigger_type = trigger_type
traces[channel] = series
if logic_channels: if logic_channels:
async with self.transaction(): async with self.transaction():
await self.set_registers(SampleAddress=(address - nsamples) % buffer_width, await self.set_registers(SampleAddress=(address - nsamples) % buffer_width,
@ -329,12 +345,20 @@ class Scope(vm.VirtualMachine):
data = await self.read_logic_samples(nsamples) data = await self.read_logic_samples(nsamples)
for i in logic_channels: for i in logic_channels:
mask = 1 << i mask = 1 << i
traces[f'L{i}'] = DotDict({'timestamps': timestamps, channel = f'L{i}'
'samples': array.array('B', (1 if value & mask else 0 for value in data)), series = DotDict({'channel': channel,
'sample_period': sample_period, 'capture_start': start_timestamp * self.primary_clock_period,
'sample_rate': sample_rate, 'timestamps': timestamps,
'cause': cause}) 'samples': array.array('B', (1 if value & mask else 0 for value in data)),
LOG.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}") 'sample_period': sample_period,
'sample_rate': sample_rate,
'cause': cause})
if cause == 'trigger' and isinstance(trigger, dict) and i in trigger:
series.trigger_timestamp = series.timestamps[trigger_samples]
series.trigger_level = trigger[i]
series.trigger_type = trigger_type
traces[channel] = series
Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
return traces return traces
async def start_waveform(self, frequency, waveform='sine', ratio=0.5, low=0, high=None, min_samples=50, max_error=1e-4): async def start_waveform(self, frequency, waveform='sine', ratio=0.5, low=0, high=None, min_samples=50, max_error=1e-4):
@ -346,25 +370,25 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})") raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high: if low < 0 or low > high:
raise ValueError("low out of range (0-high)") raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples))) max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size))) min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None best_solution = None
for clock in range(min_clock, max_clock+1): for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width) nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width)) size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency: if actualf == frequency:
LOG.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}") Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break break
error = abs(frequency - actualf) / frequency error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]): if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf best_solution = error, size, nwaves, clock, actualf
else: else:
if best_solution is None: if best_solution is None:
raise ConfigurationError("No solution to required frequency/min_samples/max_error") raise ConfigurationError("No solution to required frequency/min_samples/max_error")
error, size, nwaves, clock, actualf = best_solution error, size, nwaves, clock, actualf = best_solution
LOG.debug(f"Best solution: size={size} nwaves={nwaves} clock={clock} actualf={actualf}") Log.debug(f"Best solution: size={size} nwaves={nwaves} clock={clock} actualf={actualf}")
async with self.transaction(): async with self.transaction():
if isinstance(waveform, str): if isinstance(waveform, str):
mode = {'sine': 0, 'triangle': 1, 'exponential': 2, 'square': 3}[waveform.lower()] mode = {'sine': 0, 'triangle': 1, 'exponential': 2, 'square': 3}[waveform.lower()]
@ -391,7 +415,7 @@ class Scope(vm.VirtualMachine):
await self.set_registers(KitchenSinkB=vm.KitchenSinkB.WaveformGeneratorEnable) await self.set_registers(KitchenSinkB=vm.KitchenSinkB.WaveformGeneratorEnable)
await self.issue_configure_device_hardware() await self.issue_configure_device_hardware()
self._awg_running = True self._awg_running = True
LOG.info(f"Signal generator running at {actualf:0.1f}Hz") Log.info(f"Signal generator running at {actualf:0.1f}Hz")
return actualf return actualf
async def stop_waveform(self): async def stop_waveform(self):
@ -402,22 +426,22 @@ class Scope(vm.VirtualMachine):
await self.issue_control_clock_generator() await self.issue_control_clock_generator()
await self.set_registers(KitchenSinkB=0) await self.set_registers(KitchenSinkB=0)
await self.issue_configure_device_hardware() await self.issue_configure_device_hardware()
LOG.info("Signal generator stopped") Log.info("Signal generator stopped")
self._awg_running = False self._awg_running = False
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4): async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running: if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use") raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value) ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1) fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error: if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error") raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction(): async with self.transaction():
await self.set_registers(Map5=0x12, Clock=ticks, Rise=0, Fall=fall, Control=0x80, Cmd=3, Mode=0) await self.set_registers(Map5=0x12, Clock=ticks, Rise=0, Fall=fall, Control=0x80, Cmd=3, Mode=0)
await self.issue_control_clock_generator() await self.issue_control_clock_generator()
self._clock_running = True self._clock_running = True
LOG.info(f"Clock generator running at {actualf:0.1f}Hz, {actualr*100:.0f}% duty cycle") Log.info(f"Clock generator running at {actualf:0.1f}Hz, {actualr*100:.0f}% duty cycle")
return actualf, actualr return actualf, actualr
async def stop_clock(self): async def stop_clock(self):
@ -426,7 +450,7 @@ class Scope(vm.VirtualMachine):
async with self.transaction(): async with self.transaction():
await self.set_registers(Map5=0, Cmd=1, Mode=0) await self.set_registers(Map5=0, Cmd=1, Mode=0)
await self.issue_control_clock_generator() await self.issue_control_clock_generator()
LOG.info("Clock generator stopped") Log.info("Clock generator stopped")
self._clock_running = False self._clock_running = False
async def calibrate(self, probes='x1', n=32, save=True): async def calibrate(self, probes='x1', n=32, save=True):
@ -477,15 +501,15 @@ class Scope(vm.VirtualMachine):
full = (full + 1) / 3 full = (full + 1) / 3
analog_scale = self.clock_voltage / (full - zero) analog_scale = self.clock_voltage / (full - zero)
analog_offset = -zero * analog_scale analog_offset = -zero * analog_scale
LOG.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V") Log.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V")
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False): for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n): for hi in np.linspace(self.analog_hi_max, 0.5, n):
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0) zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero: if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero) analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range)) items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock() await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T lo, hi, low, high, offset = np.array(items).T # noqa
def f(params): def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None)) dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
@ -497,48 +521,46 @@ class Scope(vm.VirtualMachine):
constraints=[{'type': 'eq', 'fun': lambda x: x[0]*1/3 + x[1]*2/3 + x[2] - 1/3}, constraints=[{'type': 'eq', 'fun': lambda x: x[0]*1/3 + x[1]*2/3 + x[2] - 1/3},
{'type': 'eq', 'fun': lambda x: x[3]*2/3 + x[4]*1/3 + x[5] - 2/3}]) {'type': 'eq', 'fun': lambda x: x[3]*2/3 + x[4]*1/3 + x[5] - 2/3}])
if result.success: if result.success:
LOG.info(f"Calibration succeeded: {result.message}") Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None) params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x): def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params) lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2) return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
safe_low, safe_high = minimize(f, (low[0], high[0])).x safe_low, safe_high = minimize(f, (low[0], high[0])).x
offset_mean = offset.mean() offset_mean = offset.mean()
params = self.analog_params[probes] = self.AnalogParams(*result.x, analog_scale, analog_offset, safe_low, safe_high, offset_mean) params = self.analog_params[probes] = self.AnalogParams(*result.x, analog_scale, analog_offset, safe_low, safe_high, offset_mean)
LOG.info(f"{params!r} ±{100*offset.std()/offset_mean:.1f}%)") Log.info(f"{params!r} ±{100*offset.std()/offset_mean:.1f}%)")
clo, chi = self.calculate_lo_hi(low, high, params) clo, chi = self.calculate_lo_hi(low, high, params)
lo_error = np.sqrt((((clo-lo)/(hi-lo))**2).mean()) lo_error = np.sqrt((((clo-lo)/(hi-lo))**2).mean())
hi_error = np.sqrt((((chi-hi)/(hi-lo))**2).mean()) hi_error = np.sqrt((((chi-hi)/(hi-lo))**2).mean())
LOG.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps") Log.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps")
if save: if save:
self.save_analog_params() self.save_analog_params()
else: else:
LOG.warning(f"Calibration failed: {result.message}") Log.warning(f"Calibration failed: {result.message}")
return result.success return result.success
def __repr__(self): def __repr__(self):
return f"<Scope {self.url}>" return f"<Scope {self.url}>"
""" # $ ipython3 --pylab
$ ipython3 --pylab # Using matplotlib backend: MacOSX
Using matplotlib backend: MacOSX #
# In [1]: run scope
In [1]: run scope #
# In [2]: start_waveform(2000, 'triangle')
In [2]: start_waveform(2000, 'triangle') # Out[2]: 2000.0
Out[2]: 2000.0 #
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3) #
# In [4]: plot(traces.A.timestamps, traces.A.samples)
In [4]: plot(traces.A.timestamps, traces.A.samples) # Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>] #
# In [5]: plot(traces.B.timestamps, traces.B.samples)
In [5]: plot(traces.B.timestamps, traces.B.samples) # Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
async def main(): async def main():

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO. Package for asynchronous serial IO.
""" """
# pylama:ignore=W1203,R0916,W0703
import asyncio import asyncio
import logging import logging
import sys import sys
@ -14,20 +16,20 @@ import serial
from serial.tools.list_ports import comports from serial.tools.list_ports import comports
LOG = logging.getLogger(__name__) Log = logging.getLogger(__name__)
class SerialStream: class SerialStream:
@classmethod @classmethod
def devices_matching(cls, vid=None, pid=None, serial=None): def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports(): for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number): if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device yield port.device
@classmethod @classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs): def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial): for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs) return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device") raise RuntimeError("No matching serial device")
@ -36,7 +38,7 @@ class SerialStream:
self._use_threads = sys.platform == 'win32' if use_threads is None else use_threads self._use_threads = sys.platform == 'win32' if use_threads is None else use_threads
self._connection = serial.Serial(self._device, **kwargs) if self._use_threads else \ self._connection = serial.Serial(self._device, **kwargs) if self._use_threads else \
serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs) serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
LOG.debug(f"Opened SerialStream on {device}") Log.debug(f"Opened SerialStream on {device}")
self._loop = loop if loop is not None else asyncio.get_event_loop() self._loop = loop if loop is not None else asyncio.get_event_loop()
self._output_buffer = bytes() self._output_buffer = bytes()
self._output_buffer_empty = None self._output_buffer_empty = None
@ -59,15 +61,15 @@ class SerialStream:
return return
if not self._output_buffer: if not self._output_buffer:
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception: except Exception:
LOG.exception("Error writing to stream") Log.exception("Error writing to stream")
raise raise
if n: if nbytes:
LOG.debug(f"Write {data[:n]!r}") Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[n:] self._output_buffer = data[nbytes:]
else: else:
self._output_buffer += data self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None: if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self): def _feed_data(self):
try: try:
n = self._connection.write(self._output_buffer) nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception as e: except Exception as exc:
LOG.exception("Error writing to stream") Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e) self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
if n: if nbytes:
LOG.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer: if not self._output_buffer:
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None) self._output_buffer_empty.set_result(None)
@ -101,40 +103,39 @@ class SerialStream:
data = bytes(self._output_buffer) data = bytes(self._output_buffer)
self._output_buffer_lock.release() self._output_buffer_lock.release()
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
finally: finally:
self._output_buffer_lock.acquire() self._output_buffer_lock.acquire()
LOG.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None self._output_buffer_empty = None
async def read(self, n=None): async def read(self, nbytes=None):
if self._use_threads: if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n) return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True: while True:
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w: if nwaiting:
data = self._connection.read(w if n is None else min(n, w)) data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
LOG.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
else: future = self._loop.create_future()
future = self._loop.create_future() self._loop.add_reader(self._connection, future.set_result, None)
self._loop.add_reader(self._connection, future.set_result, None) try:
try: await future
await future finally:
finally: self._loop.remove_reader(self._connection)
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None): def _read_blocking(self, nbytes=None):
data = self._connection.read(1) data = self._connection.read(1)
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w and (n is None or n > 1): if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(w if n is None else min(n-1, w)) data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
LOG.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
async def readexactly(self, n): async def readexactly(self, nbytes):
data = b'' data = b''
while len(data) < n: while len(data) < nbytes:
data += await self.read(n-len(data)) data += await self.read(nbytes-len(data))
return data return data

27
test.py Normal file
View File

@ -0,0 +1,27 @@
from pylab import figure, plot, show
from analysis import annotate_series
from scope import await_, capture, main
await_(main())
series = capture(['A'], period=20e-3, nsamples=2000).A
figure(1)
plot(series.timestamps, series.samples)
if annotate_series(series):
waveform = series.waveform
if 'duty_cycle' in waveform:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
else:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with amplitude ±{waveform.amplitude:.2f}V and offset {waveform.offset:.2f}V")
plot(waveform.timestamps + waveform.capture_start - series.capture_start,
waveform.samples * waveform.amplitude + waveform.offset)
show()

View File

@ -1,8 +1,12 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict): class DotDict(dict):
__getattr__ = dict.__getitem__ __getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__ __setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__ __delattr__ = dict.__delitem__

145
vm.py
View File

@ -1,4 +1,3 @@
""" """
vm vm
== ==
@ -7,14 +6,15 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data. commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B]. document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY [VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
""" """
# pylama:ignore=E221,C0326,R0904,W1203
import array import array
import asyncio
from collections import namedtuple from collections import namedtuple
from enum import IntEnum from enum import IntEnum
import logging import logging
@ -23,7 +23,7 @@ import struct
from utils import DotDict from utils import DotDict
LOG = logging.getLogger(__name__) Log = logging.getLogger(__name__)
class Register(namedtuple('Register', ['base', 'dtype', 'description'])): class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
@ -36,30 +36,32 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else: else:
width = int(self.dtype[1:]) width = int(self.dtype[1:])
if sign == 'U': if sign == 'U':
n = 1 << width max_value = (1 << width) - 1
value = max(0, min(value, n-1)) value = min(max(0, value), max_value)
bs = struct.pack('<I', value) data = struct.pack('<I', value)
elif sign == 'S': elif sign == 'S':
n = 1 << (width - 1) max_value = (1 << (width - 1))
value = max(-n, min(value, n-1)) value = min(max(-max_value, value), max_value - 1)
bs = struct.pack('<i', value) data = struct.pack('<i', value)
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
return bs[:width//8] return data[:width//8]
def decode(self, bs):
if len(bs) < 4: def decode(self, data):
bs = bs + bytes(4 - len(bs)) if len(data) < 4:
data = data + bytes(4 - len(data))
sign = self.dtype[0] sign = self.dtype[0]
if sign == 'U': if sign == 'U':
value = struct.unpack('<I', bs)[0] value = struct.unpack('<I', data)[0]
elif sign == 'S': elif sign == 'S':
value = struct.unpack('<i', bs)[0] value = struct.unpack('<i', data)[0]
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
if '.' in self.dtype: if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1)) fraction = int(self.dtype.split('.', 1)[1])
value = value / (1 << fraction) value /= (1 << fraction)
return value return value
@property @property
def maximum_value(self): def maximum_value(self):
if '.' in self.dtype: if '.' in self.dtype:
@ -68,14 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0 whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S': if self.dtype[0] == 'S':
whole -= 1 whole -= 1
n = (1<<(whole+fraction)) - 1 max_value = (1 << (whole+fraction)) - 1
return n / (1<<fraction) if fraction else n return max_value / (1 << fraction) if fraction else max_value
@property @property
def width(self): def width(self):
if '.' in self.dtype: if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8 return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else: return int(self.dtype[1:]) // 8
return int(self.dtype[1:]) // 8
Registers = DotDict({ Registers = DotDict({
@ -88,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"), "TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"), "TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"), "TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"), "ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"), "ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"), "TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"), "TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"), "TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -149,19 +151,20 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"), "Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"), "Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"), "Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"), "PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"), "PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
}) })
class TraceMode(IntEnum): class TraceMode(IntEnum):
Analog = 0 Analog = 0
Mixed = 1 Mixed = 1
AnalogChop = 2 AnalogChop = 2
MixedChop = 3 MixedChop = 3
AnalogFast = 4 AnalogFast = 4
MixedFast = 5 MixedFast = 5
AnalogFastChop = 6 AnalogFastChop = 6
MixedFastChop = 7 MixedFastChop = 7
AnalogShot = 11 AnalogShot = 11
MixedShot = 12 MixedShot = 12
LogicShot = 13 LogicShot = 13
@ -172,6 +175,7 @@ class TraceMode(IntEnum):
Macro = 18 Macro = 18
MacroChop = 19 MacroChop = 19
class BufferMode(IntEnum): class BufferMode(IntEnum):
Single = 0 Single = 0
Chop = 1 Chop = 1
@ -180,6 +184,7 @@ class BufferMode(IntEnum):
Macro = 4 Macro = 4
MacroChop = 5 MacroChop = 5
class DumpMode(IntEnum): class DumpMode(IntEnum):
Raw = 0 Raw = 0
Burst = 1 Burst = 1
@ -190,6 +195,7 @@ class DumpMode(IntEnum):
Filter = 6 Filter = 6
Span = 7 Span = 7
class SpockOption(IntEnum): class SpockOption(IntEnum):
TriggerInvert = 0x40 TriggerInvert = 0x40
TriggerSourceA = 0x04 * 0 TriggerSourceA = 0x04 * 0
@ -198,23 +204,28 @@ class SpockOption(IntEnum):
TriggerTypeSampledAnalog = 0x01 * 0 TriggerTypeSampledAnalog = 0x01 * 0
TriggerTypeHardwareComparator = 0x01 * 1 TriggerTypeHardwareComparator = 0x01 * 1
class KitchenSinkA(IntEnum): class KitchenSinkA(IntEnum):
ChannelAComparatorEnable = 0x80 ChannelAComparatorEnable = 0x80
ChannelBComparatorEnable = 0x40 ChannelBComparatorEnable = 0x40
class KitchenSinkB(IntEnum): class KitchenSinkB(IntEnum):
AnalogFilterEnable = 0x80 AnalogFilterEnable = 0x80
WaveformGeneratorEnable = 0x40 WaveformGeneratorEnable = 0x40
class TraceStatus(IntEnum): class TraceStatus(IntEnum):
Done = 0x00 Done = 0x00
Auto = 0x01 Auto = 0x01
Wait = 0x02 Wait = 0x02
Stop = 0x03 Stop = 0x03
CaptureMode = namedtuple('CaptureMode', ('trace_mode', 'clock_low', 'clock_high', 'clock_divide', CaptureMode = namedtuple('CaptureMode', ('trace_mode', 'clock_low', 'clock_high', 'clock_divide',
'analog_channels', 'sample_width', 'logic_channels', 'buffer_mode')) 'analog_channels', 'sample_width', 'logic_channels', 'buffer_mode'))
CaptureModes = [ CaptureModes = [
CaptureMode(TraceMode.Macro, 40, 16384, 1, 1, 2, False, BufferMode.Macro), CaptureMode(TraceMode.Macro, 40, 16384, 1, 1, 2, False, BufferMode.Macro),
CaptureMode(TraceMode.MacroChop, 40, 16384, 1, 2, 2, False, BufferMode.MacroChop), CaptureMode(TraceMode.MacroChop, 40, 16384, 1, 2, 2, False, BufferMode.MacroChop),
@ -241,12 +252,15 @@ class VirtualMachine:
class Transaction: class Transaction:
def __init__(self, vm): def __init__(self, vm):
self._vm = vm self._vm = vm
self._data = b''
def append(self, cmd): def append(self, cmd):
self._data += cmd self._data += cmd
async def __aenter__(self): async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self) self._vm._transactions.append(self)
return self return self
async def __aexit__(self, exc_type, exc_value, traceback): async def __aexit__(self, exc_type, exc_value, traceback):
if self._vm._transactions.pop() != self: if self._vm._transactions.pop() != self:
raise RuntimeError("Mis-ordered transactions") raise RuntimeError("Mis-ordered transactions")
@ -265,6 +279,8 @@ class VirtualMachine:
self._writer.close() self._writer.close()
self._writer = None self._writer = None
self._reader = None self._reader = None
return True
return False
__del__ = close __del__ = close
@ -275,7 +291,7 @@ class VirtualMachine:
if isinstance(cmd, str): if isinstance(cmd, str):
cmd = cmd.encode('ascii') cmd = cmd.encode('ascii')
if not self._transactions: if not self._transactions:
LOG.debug(f"Issue: {cmd!r}") Log.debug(f"Issue: {cmd!r}")
self._writer.write(cmd) self._writer.write(cmd)
await self._writer.drain() await self._writer.drain()
echo = await self._reader.readexactly(len(cmd)) echo = await self._reader.readexactly(len(cmd))
@ -284,16 +300,16 @@ class VirtualMachine:
else: else:
self._transactions[-1].append(cmd) self._transactions[-1].append(cmd)
async def read_replies(self, n): async def read_replies(self, nreplies):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
replies = [] replies = []
data, self._reply_buffer = self._reply_buffer, b'' data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n: while len(replies) < nreplies:
index = data.find(b'\r') index = data.find(b'\r')
if index >= 0: if index >= 0:
reply = data[:index] reply = data[:index]
LOG.debug(f"Read reply: {reply!r}") Log.debug(f"Read reply: {reply!r}")
replies.append(reply) replies.append(reply)
data = data[index+1:] data = data[index+1:]
else: else:
@ -305,35 +321,35 @@ class VirtualMachine:
async def issue_reset(self): async def issue_reset(self):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
LOG.debug("Issue reset") Log.debug("Issue reset")
self._writer.write(b'!') self._writer.write(b'!')
await self._writer.drain() await self._writer.drain()
while not (await self._reader.read(1000)).endswith(b'!'): while not (await self._reader.read(1000)).endswith(b'!'):
pass pass
self._reply_buffer = b'' self._reply_buffer = b''
LOG.debug("Reset complete") Log.debug("Reset complete")
async def set_registers(self, **kwargs): async def set_registers(self, **kwargs):
cmd = '' cmd = ''
r0 = r1 = None register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs): for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name] register = Registers[name]
bs = register.encode(kwargs[name]) data = register.encode(kwargs[name])
LOG.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}") Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(bs): for i, byte in enumerate(data):
if cmd: if cmd:
cmd += 'z' cmd += 'z'
r1 += 1 register1 += 1
address = base + i address = base + i
if r1 is None or address > r1 + 3: if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@' cmd += f'{address:02x}@'
r0 = r1 = address register0 = register1 = address
else: else:
cmd += 'n' * (address - r1) cmd += 'n' * (address - register1)
r1 = address register1 = address
if byte != r0: if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}' cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte register0 = byte
if cmd: if cmd:
await self.issue(cmd + 's') await self.issue(cmd + 's')
@ -366,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self): async def issue_triggered_trace(self):
await self.issue(b'D') await self.issue(b'D')
async def read_analog_samples(self, n, sample_width): async def read_analog_samples(self, nsamples, sample_width):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
if sample_width == 2: if sample_width == 2:
data = await self._reader.readexactly(2*n) data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data))) return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1: if sample_width == 1:
data = await self._reader.readexactly(n) data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data)) return array.array('f', (value/256 for value in data))
else: raise ValueError(f"Bad sample width: {sample_width}")
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n): async def read_logic_samples(self, nsamples):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self): async def issue_cancel_trace(self):
await self.issue(b'K') await self.issue(b'K')
@ -395,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self): async def issue_wavetable_read(self):
await self.issue(b'R') await self.issue(b'R')
async def wavetable_read_bytes(self, n): async def wavetable_read_bytes(self, nbytes):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs): async def wavetable_write_bytes(self, data):
cmd = '' cmd = ''
last_byte = None last_byte = None
for byte in bs: for byte in data:
if byte != last_byte: if byte != last_byte:
cmd += f'{byte:02x}' cmd += f'{byte:02x}'
cmd += 'W' cmd += 'W'
@ -425,5 +440,3 @@ class VirtualMachine:
async def issue_write_eeprom(self): async def issue_write_eeprom(self):
await self.issue(b'w') await self.issue(b'w')