1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 11:12:09 +01:00

Compare commits

..

18 Commits

Author SHA1 Message Date
7e2b262429 Correct two howling errors in the capture logic – how did I not notice these before? 2021-06-22 13:20:06 +01:00
58aaf4c74e Merge pull request #1 from mh-/main
Fixed small problem when serial_number is not specified.
2021-02-20 11:24:12 +00:00
759828c637 Modified streams.py so that it works also with unspecified serial_number. 2021-02-14 16:16:23 +01:00
f3748a4c6a More language tweaks. 2020-07-29 18:40:02 +01:00
81eba8bc5c Fixes to clock scale selection for slow capture. 2020-07-29 18:39:46 +01:00
188db8bd76 More overdue fixes. The more I look, the more I see this shit everywhere in my code... 2020-07-24 13:49:10 +01:00
78f99c4acc Lots of linter changes. 2020-07-23 15:37:40 +01:00
829eab570a Small change to closing of scope 2020-07-23 12:38:22 +01:00
742109336b Mysterious invisible character 2020-07-20 14:00:52 +01:00
42bb240cc4 Extend README as people seem to be suddenly looking at this code; remove cruft from analysis test script. 2020-07-20 13:58:10 +01:00
b62bffe631 Analysis tweaks; split out test code for the moment 2020-07-01 19:32:06 +01:00
529a96a31b Linter fixes 2020-07-01 19:31:31 +01:00
22f7d1a81e Use timestamps starting at 0 and record scope time as capture_start in the series data 2020-07-01 19:31:18 +01:00
ca9011011d Small tweaks 2020-06-29 19:28:53 +01:00
89e0f23499 Oops! Missed one. More linter cleaning-up 2020-06-29 17:31:07 +01:00
c5222fd9b4 Tidying up a bit; work in progress on automatic analysis of results 2020-06-29 17:25:43 +01:00
751cacba6d Historic bug; saner method naming; linter fix 2020-06-27 14:51:12 +01:00
8c522205e6 Linter fixes 2020-06-27 14:26:33 +01:00
8 changed files with 536 additions and 255 deletions

View File

@ -8,3 +8,58 @@
- Also requires **NumPy** and **SciPy** if you want to do analog range calibration
- Having **Pandas** is useful for wrapping capture results for further processing
## Longer Notes
### Why have I written this?
BitScope helpfully provide applications and libraries for talking to their USB
capture devices, so one can just use those. I wrote this code because I want
to be able to grab the raw data and further process it in various ways. I'm
accustomed to working at a Python prompt with various toolkits like SciPy and
Matplotlib, so I wanted a simple way to just grab a trace as an array.
The BitScope library is pretty simple to use, but also requires you to
understand a fair amount about how the scope works and make a bunch of decisions
about what capture mode and rate to use. I want to just specify a time period,
voltage range and a rough number of samples and have all of that worked out for
me, same way as I'd use an actual oscilloscope: twiddle the knobs and look at
the trace.
Of course, I could have wrapped the BitScope library in something that would do
this, but after reading a bit about how the scope works I was fascinated with
understanding it further and so I decided to go back to first principles and
start with just talking to it with a serial library. This code thus serves as
(sort of) documentation for the VM registers, capture modes and how to use them.
It also has the advantage of being pure Python.
The code prefers the highest capture resolution possible and will do the mapping
from high/low/trigger voltages to the mysterious magic numbers that the device
needs. It can also do logic and mixed-signal capture.
In addition to capturing, the code can also generate waveforms at arbitrary
frequencies  something that is tricky to do as the device operates at specific
frequencies and so one has to massage the width of the waveform buffer to get a
frequency outside of these. It can also control the clock generator.
I've gone for an underlying async design as it makes it easy to integrate the
code into UI programs or network servers  both of which interest me as the end
purpose for this code. However, for shell use there are synchronous wrapper
functions. Of particular note is that the synchronous wrapper understands
keyboard interrupt and will cancel a capture returning the trace around the
cancel point. This is useful if your trigger doesn't fire and you want to
understand why.
### Where's the documentation, mate?
Yeah, yeah. I know.
### Also, I see no unit tests...
It's pretty hard to do unit tests for a physical device. That's my excuse and
I'm sticking to it.
### Long lines and ignoring E221, eh?
"A foolish consistency is the hobgoblin of little minds"
Also, I haven't used an 80 character wide terminal in this century.

Binary file not shown.

150
analysis.py Normal file
View File

@ -0,0 +1,150 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np
from utils import DotDict
def interpolate_min_x(f, x):
return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
def rms(f):
return np.sqrt((f ** 2).mean())
def sine_wave(n):
return np.sin(np.linspace(0, 2*np.pi, n, endpoint=False))
def triangle_wave(n):
x = np.linspace(0, 4, n, endpoint=False)
x2 = x % 2
y = np.where(x2 < 1, x2, 2 - x2)
y = np.where(x // 2 < 1, y, -y)
return y
def square_wave(n, duty=0.5):
w = int(n * duty)
return np.hstack([np.ones(w), -np.ones(n - w)])
def sawtooth_wave(n):
return 2 * (np.linspace(0.5, 1.5, n, endpoint=False) % 1) - 1
def moving_average(samples, width, mode='wrap'):
hwidth = width // 2
samples = np.take(samples, np.arange(-hwidth, len(samples)+width-hwidth), mode=mode)
cumulative = samples.cumsum()
return (cumulative[width:] - cumulative[:-width]) / width
def calculate_periodicity(series, window=0.1):
samples = np.array(series.samples, dtype='double')
window = int(len(samples) * window)
errors = np.zeros(len(samples) - window)
for i in range(1, len(errors) + 1):
errors[i-1] = rms(samples[i:] - samples[:-i])
threshold = errors.max() / 2
minima = []
for i in range(window, len(errors) - window):
p = errors[i-window:i+window].argmin()
if p == window and errors[p + i - window] < threshold:
minima.append(interpolate_min_x(errors, i))
if len(minima) <= 1:
return None
ks = np.polyfit(np.arange(0, len(minima)), minima, 1)
return ks[0] / series.sample_rate
def extract_waveform(series, period):
p = int(round(series.sample_rate * period))
n = len(series.samples) // p
if n <= 2:
return None, None, None, None
samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p
n -= 1
samples = samples[p//2:p*n + p//2] - underlying
wave = np.zeros(p)
for i in range(n):
o = i * p
wave += samples[o:o+p]
wave /= n
return wave, p//2, n, underlying
def normalize_waveform(samples, smooth=7):
n = len(samples)
smoothed = moving_average(samples, smooth)
scale = (smoothed.max() - smoothed.min()) / 2
offset = (smoothed.max() + smoothed.min()) / 2
smoothed -= offset
last_rising = first_falling = None
crossings = []
for i in range(n):
if smoothed[i-1] < 0 and smoothed[i] > 0:
last_rising = i
elif smoothed[i-1] > 0 and smoothed[i] < 0:
if last_rising is None:
first_falling = i
else:
crossings.append((i - last_rising, last_rising))
if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising))
first = min(crossings)[1]
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
def characterize_waveform(samples, crossings):
n = len(samples)
possibles = []
if len(crossings) == 1:
duty_cycle = crossings[0][1] / n
if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))
possibles.append((rms(samples - square_wave(n, duty_cycle)), 'square', duty_cycle))
possibles.sort()
return possibles
def annotate_series(series):
period = calculate_periodicity(series)
if period is not None:
waveform = DotDict(period=period, frequency=1 / period)
wave, start, count, underlying = extract_waveform(series, period)
wave, offset, scale, first, crossings = normalize_waveform(wave)
waveform.samples = wave
waveform.beginning = start + first
waveform.count = count
waveform.amplitude = scale
waveform.offset = underlying.mean() + offset
waveform.timestamps = np.arange(len(wave)) * series.sample_period
waveform.sample_period = series.sample_period
waveform.sample_rate = series.sample_rate
waveform.capture_start = series.capture_start + waveform.beginning * series.sample_period
possibles = characterize_waveform(wave, crossings)
if possibles:
error, shape, duty_cycle = possibles[0]
waveform.error = error
waveform.shape = shape
if duty_cycle is not None:
waveform.duty_cycle = duty_cycle
else:
waveform.shape = 'unknown'
series.waveform = waveform
return True
return False

304
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3
"""
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse
import array
@ -7,7 +16,6 @@ from collections import namedtuple
from configparser import ConfigParser
import logging
import math
import os
from pathlib import Path
import sys
from urllib.parse import urlparse
@ -17,14 +25,14 @@ from utils import DotDict
import vm
LOG = logging.getLogger(__name__)
ANALOG_PARAMETERS_PATH = Path('~/.config/scopething/analog.conf').expanduser()
Log = logging.getLogger(__name__)
AnalogParametersPath = Path('~/.config/scopething/analog.conf').expanduser()
class UsageError(Exception):
pass
class ConfigurationError(Exception):
pass
@ -39,13 +47,13 @@ class Scope(vm.VirtualMachine):
async def connect(self, url=None):
if url is None:
for port in streams.SerialStream.ports_matching(vid=0x0403, pid=0x6001):
url = f'file:{port.device}'
for device in streams.SerialStream.devices_matching(vid=0x0403, pid=0x6001):
url = f'file:{device}'
break
else:
raise RuntimeError("No matching serial device found")
LOG.info(f"Connecting to scope at {url}")
self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file')
if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -59,14 +67,14 @@ class Scope(vm.VirtualMachine):
return self
async def reset(self):
LOG.info("Resetting scope")
Log.info("Resetting scope")
await self.issue_reset()
await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501':
self.master_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate
self.capture_buffer_size = 12<<10
self.primary_clock_rate = 40000000
self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024
self.awg_minimum_clock = 33
@ -75,38 +83,38 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07
self.analog_hi_max = 0.88
self.timeout_clock_period = (1<<8) * self.master_clock_period
self.timestamp_rollover = (1<<32) * self.master_clock_period
self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else:
raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False
self._clock_running = False
self.load_analog_params()
LOG.info(f"Initialised scope, revision: {revision}")
Log.info(f"Initialised scope, revision: {revision}")
def load_analog_params(self):
config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH)
config.read(AnalogParametersPath)
analog_params = {}
for url in config.sections():
if url == self.url:
for probes in config[url]:
params = self.AnalogParams(*map(float, config[url][probes].split()))
analog_params[probes] = params
LOG.debug(f"Loading saved parameters for {probes}: {params!r}")
Log.debug(f"Loading saved parameters for {probes}: {params!r}")
if analog_params:
self.analog_params.update(analog_params)
LOG.info(f"Loaded analog parameters for probes: {', '.join(analog_params.keys())}")
Log.info(f"Loaded analog parameters for probes: {', '.join(analog_params.keys())}")
def save_analog_params(self):
LOG.info("Saving analog parameters")
Log.info("Saving analog parameters")
config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH)
config.read(AnalogParametersPath)
config[self.url] = {probes: ' '.join(map(str, self.analog_params[probes])) for probes in self.analog_params}
parent = ANALOG_PARAMETERS_PATH.parent
parent = AnalogParametersPath.parent
if not parent.is_dir():
parent.mkdir(parents=True)
with open(ANALOG_PARAMETERS_PATH, 'w') as parameters_file:
with open(AnalogParametersPath, 'w') as parameters_file:
config.write(parameters_file)
def __enter__(self):
@ -116,16 +124,16 @@ class Scope(vm.VirtualMachine):
self.close()
def close(self):
super().close()
LOG.info("Closed scope")
if super().close():
Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams):
params = self.AnalogParams(*list(params) + [None]*(11-len(params)))
l = (low - params.offset) / params.scale
h = (high - params.offset) / params.scale
dl = params.la*l + params.lb*h + params.lc
dh = params.ha*h + params.hb*l + params.hc
lo = (low - params.offset) / params.scale
hi = (high - params.offset) / params.scale
dl = params.la*lo + params.lb*hi + params.lc
dh = params.ha*hi + params.hb*lo + params.hc
return dl, dh
async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False,
@ -157,45 +165,47 @@ class Scope(vm.VirtualMachine):
logic_channels.remove(7)
if 'B' in analog_channels and 6 in logic_channels:
logic_channels.remove(6)
analog_enable = sum(1<<(ord(channel)-ord('A')) for channel in analog_channels)
logic_enable = sum(1<<channel for channel in logic_channels)
analog_enable = sum(1 << (ord(channel)-ord('A')) for channel in analog_channels)
logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples))
ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
LOG.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1):
LOG.debug(f"- try with tick count {ticks} x {clock_scale}")
clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks > capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else:
continue
elif ticks >= capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
LOG.debug(f"- try with tick count {ticks}")
Log.debug(f"- try with tick count {ticks}")
else:
LOG.debug(f"- mode too slow")
Log.debug("- mode too slow")
continue
n = int(round(period / self.master_clock_period / ticks / clock_scale))
actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2:
n -= n % 2
actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels:
buffer_width //= 2
if n <= buffer_width:
LOG.debug(f"- OK; period is {n} samples")
nsamples = n
if actual_nsamples <= buffer_width:
Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = actual_nsamples
break
LOG.debug(f"- insufficient buffer space for necessary {n} samples")
Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else:
raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period
sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6:
LOG.warn(f"Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0
if raw:
@ -206,11 +216,11 @@ class Scope(vm.VirtualMachine):
if low is None:
low = analog_params.safe_low if analog_channels else self.logic_low
elif low < analog_params.safe_low:
LOG.warning(f"Voltage range is below safe minimum: {low} < {analog_params.safe_low}")
Log.warning(f"Voltage range is below safe minimum: {low} < {analog_params.safe_low}")
if high is None:
high = analog_params.safe_high if analog_channels else self.logic_high
elif high > analog_params.safe_high:
LOG.warning(f"Voltage range is above safe maximum: {high} > {analog_params.safe_high}")
Log.warning(f"Voltage range is above safe maximum: {high} > {analog_params.safe_high}")
lo, hi = self.calculate_lo_hi(low, high, analog_params)
spock_option = vm.SpockOption.TriggerTypeHardwareComparator
@ -225,9 +235,23 @@ class Scope(vm.VirtualMachine):
kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable
if trigger_level is None:
trigger_level = (high + low) / 2
if not raw:
trigger_level = (trigger_level - analog_params.offset) / analog_params.scale
if trigger == 'A' or trigger == 'B':
analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
if isinstance(trigger, dict):
trigger_logic = 0
trigger_mask = 0xff
for channel, value in trigger.items():
if isinstance(channel, str):
if channel.startswith('L'):
channel = int(channel[1:]) # noqa
else:
raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7:
raise ValueError("Unrecognised trigger value")
mask = 1 << channel
trigger_mask &= ~mask
if value:
trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
@ -235,21 +259,6 @@ class Scope(vm.VirtualMachine):
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
trigger_logic = 0
trigger_mask = 0xff
for channel, value in trigger.items():
if isinstance(channel, str):
if channel.startswith('L'):
channel = int(channel[1:])
else:
raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7:
raise ValueError("Unrecognised trigger value")
mask = 1<<channel
trigger_mask &= ~mask
if value:
trigger_logic |= mask
else:
raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower()
@ -265,20 +274,19 @@ class Scope(vm.VirtualMachine):
if timeout is None:
trigger_timeout = 0
else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period
+ timeout)/self.timeout_clock_period))
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0:
raise ConfigurationError("Required trigger timeout too long")
else:
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
LOG.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
async with self.transaction():
await self.set_registers(TraceMode=capture_mode.trace_mode, BufferMode=capture_mode.buffer_mode,
SampleAddress=0, ClockTicks=ticks, ClockScale=clock_scale,
TriggerLevel=trigger_level, TriggerLogic=trigger_logic, TriggerMask=trigger_mask,
TriggerLevel=analog_trigger_level, TriggerLogic=trigger_logic, TriggerMask=trigger_mask,
TraceIntro=trace_intro, TraceOutro=trace_outro, TraceDelay=0, Timeout=trigger_timeout,
TriggerIntro=trigger_intro//2, TriggerOutro=trigger_outro//2, Prelude=0,
SpockOption=spock_option, ConverterLo=lo, ConverterHi=hi,
@ -287,7 +295,6 @@ class Scope(vm.VirtualMachine):
await self.issue_program_spock_registers()
await self.issue_configure_device_hardware()
await self.issue_triggered_trace()
begin_timestamp = None
while True:
try:
code, timestamp = (int(x, 16) for x in await self.read_replies(2))
@ -298,15 +305,15 @@ class Scope(vm.VirtualMachine):
cause = {vm.TraceStatus.Done: 'trigger', vm.TraceStatus.Auto: 'timeout', vm.TraceStatus.Stop: 'cancel'}[code]
start_timestamp = timestamp - nsamples*ticks*clock_scale
if start_timestamp < 0:
start_timestamp += 1<<32
timestamp += 1<<32
start_timestamp += 1 << 32
timestamp += 1 << 32
address = int((await self.read_replies(1))[0], 16)
if capture_mode.analog_channels == 2:
address -= address % 2
traces = DotDict()
timestamps = array.array('d', (t*self.master_clock_period for t in range(start_timestamp, timestamp, ticks*clock_scale)))
start_time = start_timestamp*self.master_clock_period
timestamps = array.array('d', (i * sample_period for i in range(nsamples)))
for dump_channel, channel in enumerate(sorted(analog_channels)):
asamples = nsamples // len(analog_channels)
async with self.transaction():
@ -317,11 +324,18 @@ class Scope(vm.VirtualMachine):
await self.issue_analog_dump_binary()
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
traces[channel] = DotDict({'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels),
'sample_rate': sample_rate/len(analog_channels),
'cause': cause})
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels),
'sample_rate': sample_rate/len(analog_channels),
'cause': cause})
if cause == 'trigger' and channel == trigger:
series.trigger_timestamp = series.timestamps[trigger_samples // len(analog_channels)]
series.trigger_level = trigger_level
series.trigger_type = trigger_type
traces[channel] = series
if logic_channels:
async with self.transaction():
await self.set_registers(SampleAddress=(address - nsamples) % buffer_width,
@ -330,13 +344,21 @@ class Scope(vm.VirtualMachine):
await self.issue_analog_dump_binary()
data = await self.read_logic_samples(nsamples)
for i in logic_channels:
mask = 1<<i
traces[f'L{i}'] = DotDict({'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period,
'sample_rate': sample_rate,
'cause': cause})
LOG.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
mask = 1 << i
channel = f'L{i}'
series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period,
'sample_rate': sample_rate,
'cause': cause})
if cause == 'trigger' and isinstance(trigger, dict) and i in trigger:
series.trigger_timestamp = series.timestamps[trigger_samples]
series.trigger_level = trigger[i]
series.trigger_type = trigger_type
traces[channel] = series
Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
return traces
async def start_waveform(self, frequency, waveform='sine', ratio=0.5, low=0, high=None, min_samples=50, max_error=1e-4):
@ -348,25 +370,25 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high:
raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size)))
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None
for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock
width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock
actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency:
LOG.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break
error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]):
if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf
else:
if best_solution is None:
raise ConfigurationError("No solution to required frequency/min_samples/max_error")
error, size, nwaves, clock, actualf = best_solution
LOG.debug(f"Best solution: size={size} nwaves={nwaves} clock={clock} actualf={actualf}")
Log.debug(f"Best solution: size={size} nwaves={nwaves} clock={clock} actualf={actualf}")
async with self.transaction():
if isinstance(waveform, str):
mode = {'sine': 0, 'triangle': 1, 'exponential': 2, 'square': 3}[waveform.lower()]
@ -393,7 +415,7 @@ class Scope(vm.VirtualMachine):
await self.set_registers(KitchenSinkB=vm.KitchenSinkB.WaveformGeneratorEnable)
await self.issue_configure_device_hardware()
self._awg_running = True
LOG.info(f"Signal generator running at {actualf:0.1f}Hz")
Log.info(f"Signal generator running at {actualf:0.1f}Hz")
return actualf
async def stop_waveform(self):
@ -404,22 +426,22 @@ class Scope(vm.VirtualMachine):
await self.issue_control_clock_generator()
await self.set_registers(KitchenSinkB=0)
await self.issue_configure_device_hardware()
LOG.info("Signal generator stopped")
Log.info("Signal generator stopped")
self._awg_running = False
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks
actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction():
await self.set_registers(Map5=0x12, Clock=ticks, Rise=0, Fall=fall, Control=0x80, Cmd=3, Mode=0)
await self.issue_control_clock_generator()
self._clock_running = True
LOG.info(f"Clock generator running at {actualf:0.1f}Hz, {actualr*100:.0f}% duty cycle")
Log.info(f"Clock generator running at {actualf:0.1f}Hz, {actualr*100:.0f}% duty cycle")
return actualf, actualr
async def stop_clock(self):
@ -428,12 +450,12 @@ class Scope(vm.VirtualMachine):
async with self.transaction():
await self.set_registers(Map5=0, Cmd=1, Mode=0)
await self.issue_control_clock_generator()
LOG.info("Clock generator stopped")
Log.info("Clock generator stopped")
self._clock_running = False
async def calibrate(self, probes='x1', n=32, save=True):
"""
Derive values for the analogue parameters based on generating a 3.3V 10kHz clock
Derive values for the analogue parameters based on generating a 3.3V 2kHz clock
signal and then sampling the analogue channels to measure this. The first step is
to set the low and high range DACs to 1/3 and 2/3, respectively. This results in
*neutral* voltages matching the three series 300Ω resistances created by the ADC
@ -458,84 +480,88 @@ class Scope(vm.VirtualMachine):
import numpy as np
from scipy.optimize import minimize
items = []
async def measure(lo, hi, period=2e-3, chop=True):
if chop:
traces = await self.capture(channels=['A','B'], period=period, nsamples=2000, timeout=0, low=lo, high=hi, raw=True)
traces = await self.capture(channels=['A', 'B'], period=period, nsamples=2000, timeout=0, low=lo, high=hi, raw=True)
A = np.array(traces.A.samples)
B = np.array(traces.B.samples)
else:
A = np.array((await self.capture(channels=['A'], period=period/2, nsamples=1000, timeout=0, low=lo, high=hi, raw=True)).A.samples)
B = np.array((await self.capture(channels=['B'], period=period/2, nsamples=1000, timeout=0, low=lo, high=hi, raw=True)).B.samples)
Amean = A.mean()
Azero, Afull = np.median(A[A<=Amean]), np.median(A[A>=Amean])
Azero, Afull = np.median(A[A <= Amean]), np.median(A[A >= Amean])
Bmean = B.mean()
Bzero, Bfull = np.median(B[B<=Bmean]), np.median(B[B>=Bmean])
Bzero, Bfull = np.median(B[B <= Bmean]), np.median(B[B >= Bmean])
return (Azero + Bzero) / 2, (Afull + Bfull) / 2, ((Afull - Bfull) + (Azero - Azero)) / 2
await self.start_clock(frequency=2000)
zero, full, offset = await measure(1/3, 2/3)
zero = (zero + 1) / 3
full = (full + 1) / 3
analog_scale = self.clock_voltage / (full - zero)
analog_offset = -zero * analog_scale
LOG.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V")
Log.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V")
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n):
period = 2e-3 if len(items) % 4 < 2 else 1e-3
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero:
if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T
lo, hi, low, high, offset = np.array(items).T # noqa
def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
return np.sqrt((lo-dl)**2 + (hi-dh)**2).mean()
start_params = self.analog_params.get(probes, [1,0,0,1,0,0])[:6]
start_params = self.analog_params.get(probes, [1, 0, 0, 1, 0, 0])[:6]
result = minimize(f, start_params, method='SLSQP',
bounds=[(1,np.inf), (-np.inf,0), (0,np.inf), (1,np.inf), (-np.inf,0), (-np.inf,0)],
bounds=[(1, np.inf), (-np.inf, 0), (0, np.inf), (1, np.inf), (-np.inf, 0), (-np.inf, 0)],
constraints=[{'type': 'eq', 'fun': lambda x: x[0]*1/3 + x[1]*2/3 + x[2] - 1/3},
{'type': 'eq', 'fun': lambda x: x[3]*2/3 + x[4]*1/3 + x[5] - 2/3}])
if result.success:
LOG.info(f"Calibration succeeded: {result.message}")
Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x):
def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
safe_low, safe_high = minimize(f, (low[0], high[0])).x
offset_mean = offset.mean()
params = self.analog_params[probes] = self.AnalogParams(*result.x, analog_scale, analog_offset, safe_low, safe_high, offset_mean)
LOG.info(f"{params!r} ±{100*offset.std()/offset_mean:.1f}%)")
Log.info(f"{params!r} ±{100*offset.std()/offset_mean:.1f}%)")
clo, chi = self.calculate_lo_hi(low, high, params)
lo_error = np.sqrt((((clo-lo)/(hi-lo))**2).mean())
hi_error = np.sqrt((((chi-hi)/(hi-lo))**2).mean())
LOG.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps")
Log.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps")
if save:
self.save_analog_params()
else:
LOG.warning(f"Calibration failed: {result.message}")
Log.warning(f"Calibration failed: {result.message}")
return result.success
def __repr__(self):
return f"<Scope {self.url}>"
"""
$ ipython3 --pylab
Using matplotlib backend: MacOSX
# $ ipython3 --pylab
# Using matplotlib backend: MacOSX
#
# In [1]: run scope
#
# In [2]: start_waveform(2000, 'triangle')
# Out[2]: 2000.0
#
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
#
# In [4]: plot(traces.A.timestamps, traces.A.samples)
# Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
#
# In [5]: plot(traces.B.timestamps, traces.B.samples)
# Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
In [1]: run scope
In [2]: start_waveform(2000, 'triangle')
Out[2]: 2000.0
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [4]: plot(traces.A.timestamps, traces.A.samples)
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
In [5]: plot(traces.B.timestamps, traces.B.samples)
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
async def main():
global s
@ -547,6 +573,7 @@ async def main():
logging.basicConfig(level=logging.DEBUG if args.debug else (logging.INFO if args.verbose else logging.WARNING), stream=sys.stdout)
s = await Scope().connect(args.url)
def await_(g):
task = asyncio.Task(g)
while True:
@ -555,23 +582,28 @@ def await_(g):
except KeyboardInterrupt:
task.cancel()
def capture(*args, **kwargs):
return await_(s.capture(*args, **kwargs))
def capturep(*args, **kwargs):
import pandas
traces = capture(*args, **kwargs)
return pandas.DataFrame({channel: pandas.Series(trace.samples, trace.timestamps) for (channel,trace) in traces.items()})
return pandas.DataFrame({channel: pandas.Series(trace.samples, trace.timestamps) for (channel, trace) in traces.items()})
def calibrate(*args, **kwargs):
return await_(s.calibrate(*args, **kwargs))
def start_waveform(*args, **kwargs):
return await_(s.start_waveform(*args, **kwargs))
def start_clock(*args, **kwargs):
return await_(s.start_clock(*args, **kwargs))
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO.
"""
# pylama:ignore=W1203,R0916,W0703
import asyncio
import logging
import sys
@ -14,29 +16,29 @@ import serial
from serial.tools.list_ports import comports
LOG = logging.getLogger(__name__)
Log = logging.getLogger(__name__)
class SerialStream:
@classmethod
def ports_matching(cls, vid=None, pid=None, serial=None):
def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number):
yield port
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device
@classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs):
for port in cls.devices_matching(vid, pid, serial):
return SerialStream(port.device, **kwargs)
def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device")
def __init__(self, device, use_threads=None, loop=None, **kwargs):
self._device = device
self._use_threads = sys.platform == 'win32' if use_threads is None else use_threads
self._connection = serial.Serial(self._device, **kwargs) if self._use_threads else \
serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
LOG.debug(f"Opened SerialStream on {device}")
serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
Log.debug(f"Opened SerialStream on {device}")
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._output_buffer = bytes()
self._output_buffer_empty = None
@ -59,15 +61,15 @@ class SerialStream:
return
if not self._output_buffer:
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
except serial.SerialTimeoutException:
n = 0
nbytes = 0
except Exception:
LOG.exception("Error writing to stream")
Log.exception("Error writing to stream")
raise
if n:
LOG.debug(f"Write {data[:n]!r}")
self._output_buffer = data[n:]
if nbytes:
Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[nbytes:]
else:
self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self):
try:
n = self._connection.write(self._output_buffer)
nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException:
n = 0
except Exception as e:
LOG.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e)
nbytes = 0
except Exception as exc:
Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection)
if n:
LOG.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
if nbytes:
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer:
self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None)
@ -101,41 +103,39 @@ class SerialStream:
data = bytes(self._output_buffer)
self._output_buffer_lock.release()
try:
n = self._connection.write(data)
nbytes = self._connection.write(data)
finally:
self._output_buffer_lock.acquire()
LOG.debug(f"Write {self._output_buffer[:n]!r}")
self._output_buffer = self._output_buffer[n:]
Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None
async def read(self, n=None):
async def read(self, nbytes=None):
if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n)
return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True:
w = self._connection.in_waiting
if w:
data = self._connection.read(w if n is None else min(n, w))
LOG.debug(f"Read {data!r}")
nwaiting = self._connection.in_waiting
if nwaiting:
data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
Log.debug(f"Read {data!r}")
return data
else:
future = self._loop.create_future()
self._loop.add_reader(self._connection, future.set_result, None)
try:
await future
finally:
self._loop.remove_reader(self._connection)
future = self._loop.create_future()
self._loop.add_reader(self._connection, future.set_result, None)
try:
await future
finally:
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None):
def _read_blocking(self, nbytes=None):
data = self._connection.read(1)
w = self._connection.in_waiting
if w and (n is None or n > 1):
data += self._connection.read(w if n is None else min(n-1, w))
LOG.debug(f"Read {data!r}")
nwaiting = self._connection.in_waiting
if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
Log.debug(f"Read {data!r}")
return data
async def readexactly(self, n):
async def readexactly(self, nbytes):
data = b''
while len(data) < n:
data += await self.read(n-len(data))
while len(data) < nbytes:
data += await self.read(nbytes-len(data))
return data

27
test.py Normal file
View File

@ -0,0 +1,27 @@
from pylab import figure, plot, show
from analysis import annotate_series
from scope import await_, capture, main
await_(main())
series = capture(['A'], period=20e-3, nsamples=2000).A
figure(1)
plot(series.timestamps, series.samples)
if annotate_series(series):
waveform = series.waveform
if 'duty_cycle' in waveform:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
else:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with amplitude ±{waveform.amplitude:.2f}V and offset {waveform.offset:.2f}V")
plot(waveform.timestamps + waveform.capture_start - series.capture_start,
waveform.samples * waveform.amplitude + waveform.offset)
show()

View File

@ -1,8 +1,12 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__

145
vm.py
View File

@ -1,4 +1,3 @@
"""
vm
==
@ -7,14 +6,15 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B].
document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
"""
# pylama:ignore=E221,C0326,R0904,W1203
import array
import asyncio
from collections import namedtuple
from enum import IntEnum
import logging
@ -23,7 +23,7 @@ import struct
from utils import DotDict
LOG = logging.getLogger(__name__)
Log = logging.getLogger(__name__)
class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
@ -36,30 +36,32 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else:
width = int(self.dtype[1:])
if sign == 'U':
n = 1 << width
value = max(0, min(value, n-1))
bs = struct.pack('<I', value)
max_value = (1 << width) - 1
value = min(max(0, value), max_value)
data = struct.pack('<I', value)
elif sign == 'S':
n = 1 << (width - 1)
value = max(-n, min(value, n-1))
bs = struct.pack('<i', value)
max_value = (1 << (width - 1))
value = min(max(-max_value, value), max_value - 1)
data = struct.pack('<i', value)
else:
raise TypeError("Unrecognised dtype")
return bs[:width//8]
def decode(self, bs):
if len(bs) < 4:
bs = bs + bytes(4 - len(bs))
return data[:width//8]
def decode(self, data):
if len(data) < 4:
data = data + bytes(4 - len(data))
sign = self.dtype[0]
if sign == 'U':
value = struct.unpack('<I', bs)[0]
value = struct.unpack('<I', data)[0]
elif sign == 'S':
value = struct.unpack('<i', bs)[0]
value = struct.unpack('<i', data)[0]
else:
raise TypeError("Unrecognised dtype")
if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1))
value = value / (1 << fraction)
fraction = int(self.dtype.split('.', 1)[1])
value /= (1 << fraction)
return value
@property
def maximum_value(self):
if '.' in self.dtype:
@ -68,14 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S':
whole -= 1
n = (1<<(whole+fraction)) - 1
return n / (1<<fraction) if fraction else n
max_value = (1 << (whole+fraction)) - 1
return max_value / (1 << fraction) if fraction else max_value
@property
def width(self):
if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else:
return int(self.dtype[1:]) // 8
return int(self.dtype[1:]) // 8
Registers = DotDict({
@ -88,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"),
"ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -149,19 +151,20 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
"PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
})
class TraceMode(IntEnum):
Analog = 0
Mixed = 1
AnalogChop = 2
MixedChop = 3
AnalogFast = 4
MixedFast = 5
AnalogFastChop = 6
MixedFastChop = 7
Analog = 0
Mixed = 1
AnalogChop = 2
MixedChop = 3
AnalogFast = 4
MixedFast = 5
AnalogFastChop = 6
MixedFastChop = 7
AnalogShot = 11
MixedShot = 12
LogicShot = 13
@ -172,6 +175,7 @@ class TraceMode(IntEnum):
Macro = 18
MacroChop = 19
class BufferMode(IntEnum):
Single = 0
Chop = 1
@ -180,6 +184,7 @@ class BufferMode(IntEnum):
Macro = 4
MacroChop = 5
class DumpMode(IntEnum):
Raw = 0
Burst = 1
@ -190,6 +195,7 @@ class DumpMode(IntEnum):
Filter = 6
Span = 7
class SpockOption(IntEnum):
TriggerInvert = 0x40
TriggerSourceA = 0x04 * 0
@ -198,23 +204,28 @@ class SpockOption(IntEnum):
TriggerTypeSampledAnalog = 0x01 * 0
TriggerTypeHardwareComparator = 0x01 * 1
class KitchenSinkA(IntEnum):
ChannelAComparatorEnable = 0x80
ChannelBComparatorEnable = 0x40
class KitchenSinkB(IntEnum):
AnalogFilterEnable = 0x80
WaveformGeneratorEnable = 0x40
class TraceStatus(IntEnum):
Done = 0x00
Auto = 0x01
Wait = 0x02
Stop = 0x03
CaptureMode = namedtuple('CaptureMode', ('trace_mode', 'clock_low', 'clock_high', 'clock_divide',
'analog_channels', 'sample_width', 'logic_channels', 'buffer_mode'))
CaptureModes = [
CaptureMode(TraceMode.Macro, 40, 16384, 1, 1, 2, False, BufferMode.Macro),
CaptureMode(TraceMode.MacroChop, 40, 16384, 1, 2, 2, False, BufferMode.MacroChop),
@ -241,12 +252,15 @@ class VirtualMachine:
class Transaction:
def __init__(self, vm):
self._vm = vm
self._data = b''
def append(self, cmd):
self._data += cmd
async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self)
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self._vm._transactions.pop() != self:
raise RuntimeError("Mis-ordered transactions")
@ -265,6 +279,8 @@ class VirtualMachine:
self._writer.close()
self._writer = None
self._reader = None
return True
return False
__del__ = close
@ -275,7 +291,7 @@ class VirtualMachine:
if isinstance(cmd, str):
cmd = cmd.encode('ascii')
if not self._transactions:
LOG.debug(f"Issue: {cmd!r}")
Log.debug(f"Issue: {cmd!r}")
self._writer.write(cmd)
await self._writer.drain()
echo = await self._reader.readexactly(len(cmd))
@ -284,16 +300,16 @@ class VirtualMachine:
else:
self._transactions[-1].append(cmd)
async def read_replies(self, n):
async def read_replies(self, nreplies):
if self._transactions:
raise TypeError("Command transaction in progress")
replies = []
data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n:
while len(replies) < nreplies:
index = data.find(b'\r')
if index >= 0:
reply = data[:index]
LOG.debug(f"Read reply: {reply!r}")
Log.debug(f"Read reply: {reply!r}")
replies.append(reply)
data = data[index+1:]
else:
@ -305,35 +321,35 @@ class VirtualMachine:
async def issue_reset(self):
if self._transactions:
raise TypeError("Command transaction in progress")
LOG.debug("Issue reset")
Log.debug("Issue reset")
self._writer.write(b'!')
await self._writer.drain()
while not (await self._reader.read(1000)).endswith(b'!'):
pass
self._reply_buffer = b''
LOG.debug("Reset complete")
Log.debug("Reset complete")
async def set_registers(self, **kwargs):
cmd = ''
r0 = r1 = None
register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name]
bs = register.encode(kwargs[name])
LOG.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
for i, byte in enumerate(bs):
data = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(data):
if cmd:
cmd += 'z'
r1 += 1
register1 += 1
address = base + i
if r1 is None or address > r1 + 3:
if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@'
r0 = r1 = address
register0 = register1 = address
else:
cmd += 'n' * (address - r1)
r1 = address
if byte != r0:
cmd += 'n' * (address - register1)
register1 = address
if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte
register0 = byte
if cmd:
await self.issue(cmd + 's')
@ -366,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self):
await self.issue(b'D')
async def read_analog_samples(self, n, sample_width):
async def read_analog_samples(self, nsamples, sample_width):
if self._transactions:
raise TypeError("Command transaction in progress")
if sample_width == 2:
data = await self._reader.readexactly(2*n)
data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1:
data = await self._reader.readexactly(n)
if sample_width == 1:
data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data))
else:
raise ValueError(f"Bad sample width: {sample_width}")
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n):
async def read_logic_samples(self, nsamples):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self):
await self.issue(b'K')
@ -395,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self):
await self.issue(b'R')
async def wavetable_read_bytes(self, n):
async def wavetable_read_bytes(self, nbytes):
if self._transactions:
raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n)
return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs):
async def wavetable_write_bytes(self, data):
cmd = ''
last_byte = None
for byte in bs:
for byte in data:
if byte != last_byte:
cmd += f'{byte:02x}'
cmd += 'W'
@ -425,5 +440,3 @@ class VirtualMachine:
async def issue_write_eeprom(self):
await self.issue(b'w')