1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 11:12:09 +01:00

Compare commits

..

13 Commits

8 changed files with 273 additions and 196 deletions

View File

@ -8,3 +8,58 @@
- Also requires **NumPy** and **SciPy** if you want to do analog range calibration - Also requires **NumPy** and **SciPy** if you want to do analog range calibration
- Having **Pandas** is useful for wrapping capture results for further processing - Having **Pandas** is useful for wrapping capture results for further processing
## Longer Notes
### Why have I written this?
BitScope helpfully provide applications and libraries for talking to their USB
capture devices, so one can just use those. I wrote this code because I want
to be able to grab the raw data and further process it in various ways. I'm
accustomed to working at a Python prompt with various toolkits like SciPy and
Matplotlib, so I wanted a simple way to just grab a trace as an array.
The BitScope library is pretty simple to use, but also requires you to
understand a fair amount about how the scope works and make a bunch of decisions
about what capture mode and rate to use. I want to just specify a time period,
voltage range and a rough number of samples and have all of that worked out for
me, same way as I'd use an actual oscilloscope: twiddle the knobs and look at
the trace.
Of course, I could have wrapped the BitScope library in something that would do
this, but after reading a bit about how the scope works I was fascinated with
understanding it further and so I decided to go back to first principles and
start with just talking to it with a serial library. This code thus serves as
(sort of) documentation for the VM registers, capture modes and how to use them.
It also has the advantage of being pure Python.
The code prefers the highest capture resolution possible and will do the mapping
from high/low/trigger voltages to the mysterious magic numbers that the device
needs. It can also do logic and mixed-signal capture.
In addition to capturing, the code can also generate waveforms at arbitrary
frequencies  something that is tricky to do as the device operates at specific
frequencies and so one has to massage the width of the waveform buffer to get a
frequency outside of these. It can also control the clock generator.
I've gone for an underlying async design as it makes it easy to integrate the
code into UI programs or network servers  both of which interest me as the end
purpose for this code. However, for shell use there are synchronous wrapper
functions. Of particular note is that the synchronous wrapper understands
keyboard interrupt and will cancel a capture returning the trace around the
cancel point. This is useful if your trigger doesn't fire and you want to
understand why.
### Where's the documentation, mate?
Yeah, yeah. I know.
### Also, I see no unit tests...
It's pretty hard to do unit tests for a physical device. That's my excuse and
I'm sticking to it.
### Long lines and ignoring E221, eh?
"A foolish consistency is the hobgoblin of little minds"
Also, I haven't used an 80 character wide terminal in this century.

Binary file not shown.

View File

@ -1,6 +1,16 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np import numpy as np
from utils import DotDict
def interpolate_min_x(f, x): def interpolate_min_x(f, x):
return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
@ -39,7 +49,7 @@ def moving_average(samples, width, mode='wrap'):
def calculate_periodicity(series, window=0.1): def calculate_periodicity(series, window=0.1):
samples = np.array(series.samples) samples = np.array(series.samples, dtype='double')
window = int(len(samples) * window) window = int(len(samples) * window)
errors = np.zeros(len(samples) - window) errors = np.zeros(len(samples) - window)
for i in range(1, len(errors) + 1): for i in range(1, len(errors) + 1):
@ -60,7 +70,7 @@ def extract_waveform(series, period):
p = int(round(series.sample_rate * period)) p = int(round(series.sample_rate * period))
n = len(series.samples) // p n = len(series.samples) // p
if n <= 2: if n <= 2:
return None, None return None, None, None, None
samples = np.array(series.samples)[:p*n] samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum() cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p underlying = (cumsum[p:] - cumsum[:-p]) / p
@ -92,8 +102,8 @@ def normalize_waveform(samples, smooth=7):
crossings.append((i - last_rising, last_rising)) crossings.append((i - last_rising, last_rising))
if first_falling is not None: if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising)) crossings.append((n + first_falling - last_rising, last_rising))
width, first = min(crossings) first = min(crossings)[1]
wave = np.hstack([smoothed[first:], smoothed[:first]]) / scale wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings) return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
@ -102,7 +112,7 @@ def characterize_waveform(samples, crossings):
possibles = [] possibles = []
if len(crossings) == 1: if len(crossings) == 1:
duty_cycle = crossings[0][1] / n duty_cycle = crossings[0][1] / n
if duty_cycle > 0.45 and duty_cycle < 0.55: if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None)) possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None)) possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None)) possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))
@ -111,7 +121,7 @@ def characterize_waveform(samples, crossings):
return possibles return possibles
def analyze_series(series): def annotate_series(series):
period = calculate_periodicity(series) period = calculate_periodicity(series)
if period is not None: if period is not None:
waveform = DotDict(period=period, frequency=1 / period) waveform = DotDict(period=period, frequency=1 / period)
@ -122,6 +132,10 @@ def analyze_series(series):
waveform.count = count waveform.count = count
waveform.amplitude = scale waveform.amplitude = scale
waveform.offset = underlying.mean() + offset waveform.offset = underlying.mean() + offset
waveform.timestamps = np.arange(len(wave)) * series.sample_period
waveform.sample_period = series.sample_period
waveform.sample_rate = series.sample_rate
waveform.capture_start = series.capture_start + waveform.beginning * series.sample_period
possibles = characterize_waveform(wave, crossings) possibles = characterize_waveform(wave, crossings)
if possibles: if possibles:
error, shape, duty_cycle = possibles[0] error, shape, duty_cycle = possibles[0]
@ -132,37 +146,5 @@ def analyze_series(series):
else: else:
waveform.shape = 'unknown' waveform.shape = 'unknown'
series.waveform = waveform series.waveform = waveform
return True
return False
# %%
from pylab import figure, plot, show
from utils import DotDict
o = 400
m = 5
n = o * m
samples = square_wave(o)
samples = np.hstack([samples] * m) * 2
samples = np.hstack([samples[100:], samples[:100]])
samples += np.random.normal(size=n) * 0.1
samples += np.linspace(4.5, 5.5, n)
series = DotDict(samples=samples, sample_rate=1000000)
analyze_series(series)
if 'waveform' in series:
waveform = series.waveform
if 'duty_cycle' in waveform:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
else:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
figure(1)
plot(series.samples)
wave = np.hstack([waveform.samples[-waveform.beginning:]] + [waveform.samples] * waveform.count + [waveform.samples[:-waveform.beginning]])
plot(wave * waveform.amplitude + waveform.offset)
show()

137
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3 """
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse import argparse
import array import array
@ -43,8 +52,8 @@ class Scope(vm.VirtualMachine):
break break
else: else:
raise RuntimeError("No matching serial device found") raise RuntimeError("No matching serial device found")
Log.info(f"Connecting to scope at {url}")
self.close() self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file') parts = urlparse(url, scheme='file')
if parts.scheme == 'file': if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path) self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -63,8 +72,8 @@ class Scope(vm.VirtualMachine):
await self.issue_get_revision() await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii') revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501': if revision == 'BS000501':
self.master_clock_rate = 40000000 self.primary_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10 self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024 self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024 self.awg_sample_buffer_size = 1024
@ -74,8 +83,8 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)} self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07 self.analog_lo_min = 0.07
self.analog_hi_max = 0.88 self.analog_hi_max = 0.88
self.timeout_clock_period = (1 << 8) * self.master_clock_period self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else: else:
raise RuntimeError(f"Unsupported scope, revision: {revision}") raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False self._awg_running = False
@ -115,8 +124,8 @@ class Scope(vm.VirtualMachine):
self.close() self.close()
def close(self): def close(self):
super().close() if super().close():
Log.info("Closed scope") Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params): def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams): if not isinstance(params, self.AnalogParams):
@ -160,14 +169,16 @@ class Scope(vm.VirtualMachine):
logic_enable = sum(1 << channel for channel in logic_channels) logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes: for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples)) ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1 clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels): if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...") Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1: if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high)) clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale)) ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1): if ticks > capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}") Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else: else:
continue continue
@ -178,23 +189,23 @@ class Scope(vm.VirtualMachine):
else: else:
Log.debug("- mode too slow") Log.debug("- mode too slow")
continue continue
n = int(round(period / self.master_clock_period / ticks / clock_scale)) actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2: if len(analog_channels) == 2:
n -= n % 2 actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels: if logic_channels and analog_channels:
buffer_width //= 2 buffer_width //= 2
if n <= buffer_width: if actual_nsamples <= buffer_width:
Log.debug(f"- OK; period is {n} samples") Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = n nsamples = actual_nsamples
break break
Log.debug(f"- insufficient buffer space for necessary {n} samples") Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else: else:
raise ConfigurationError("Unable to find appropriate capture mode") raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6: if trigger_position and sample_rate > 5e6:
Log.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0") Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0 trigger_position = 0
if raw: if raw:
@ -225,21 +236,13 @@ class Scope(vm.VirtualMachine):
if trigger_level is None: if trigger_level is None:
trigger_level = (high + low) / 2 trigger_level = (high + low) / 2
analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
if trigger == 'A' or trigger == 'B': if isinstance(trigger, dict):
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
trigger_logic = 0 trigger_logic = 0
trigger_mask = 0xff trigger_mask = 0xff
for channel, value in trigger.items(): for channel, value in trigger.items():
if isinstance(channel, str): if isinstance(channel, str):
if channel.startswith('L'): if channel.startswith('L'):
channel = int(channel[1:]) channel = int(channel[1:]) # noqa
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7: if channel < 0 or channel > 7:
@ -248,6 +251,14 @@ class Scope(vm.VirtualMachine):
trigger_mask &= ~mask trigger_mask &= ~mask
if value: if value:
trigger_logic |= mask trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower() trigger_type = trigger_type.lower()
@ -263,13 +274,12 @@ class Scope(vm.VirtualMachine):
if timeout is None: if timeout is None:
trigger_timeout = 0 trigger_timeout = 0
else: else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period)) + timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value: if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0: if timeout > 0:
raise ConfigurationError("Required trigger timeout too long") raise ConfigurationError("Required trigger timeout too long")
else: raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture " Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})") f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
@ -302,7 +312,8 @@ class Scope(vm.VirtualMachine):
address -= address % 2 address -= address % 2
traces = DotDict() traces = DotDict()
timestamps = array.array('d', (t*self.master_clock_period for t in range(0, timestamp, ticks*clock_scale)))
timestamps = array.array('d', (i * sample_period for i in range(nsamples)))
for dump_channel, channel in enumerate(sorted(analog_channels)): for dump_channel, channel in enumerate(sorted(analog_channels)):
asamples = nsamples // len(analog_channels) asamples = nsamples // len(analog_channels)
async with self.transaction(): async with self.transaction():
@ -314,7 +325,7 @@ class Scope(vm.VirtualMachine):
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1)) value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width) data = await self.read_analog_samples(asamples, capture_mode.sample_width)
series = DotDict({'channel': channel, series = DotDict({'channel': channel,
'start_timestamp': start_timestamp, 'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps, 'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)), 'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels), 'sample_period': sample_period*len(analog_channels),
@ -336,7 +347,7 @@ class Scope(vm.VirtualMachine):
mask = 1 << i mask = 1 << i
channel = f'L{i}' channel = f'L{i}'
series = DotDict({'channel': channel, series = DotDict({'channel': channel,
'start_timestamp': start_timestamp, 'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps, 'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)), 'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period, 'sample_period': sample_period,
@ -346,7 +357,7 @@ class Scope(vm.VirtualMachine):
series.trigger_timestamp = series.timestamps[trigger_samples] series.trigger_timestamp = series.timestamps[trigger_samples]
series.trigger_level = trigger[i] series.trigger_level = trigger[i]
series.trigger_type = trigger_type series.trigger_type = trigger_type
traces[channel] = series traces[channel] = series
Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}") Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
return traces return traces
@ -359,19 +370,19 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})") raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high: if low < 0 or low > high:
raise ValueError("low out of range (0-high)") raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples))) max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size))) min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None best_solution = None
for clock in range(min_clock, max_clock+1): for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width) nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width)) size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency: if actualf == frequency:
Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}") Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break break
error = abs(frequency - actualf) / frequency error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]): if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf best_solution = error, size, nwaves, clock, actualf
else: else:
if best_solution is None: if best_solution is None:
@ -421,9 +432,9 @@ class Scope(vm.VirtualMachine):
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4): async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running: if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use") raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value) ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1) fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error: if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error") raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction(): async with self.transaction():
@ -494,11 +505,11 @@ class Scope(vm.VirtualMachine):
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False): for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n): for hi in np.linspace(self.analog_hi_max, 0.5, n):
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0) zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero: if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero) analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range)) items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock() await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T lo, hi, low, high, offset = np.array(items).T # noqa
def f(params): def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None)) dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
@ -513,7 +524,7 @@ class Scope(vm.VirtualMachine):
Log.info(f"Calibration succeeded: {result.message}") Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None) params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x): def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params) lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2) return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
@ -535,23 +546,21 @@ class Scope(vm.VirtualMachine):
return f"<Scope {self.url}>" return f"<Scope {self.url}>"
""" # $ ipython3 --pylab
$ ipython3 --pylab # Using matplotlib backend: MacOSX
Using matplotlib backend: MacOSX #
# In [1]: run scope
In [1]: run scope #
# In [2]: start_waveform(2000, 'triangle')
In [2]: start_waveform(2000, 'triangle') # Out[2]: 2000.0
Out[2]: 2000.0 #
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3) #
# In [4]: plot(traces.A.timestamps, traces.A.samples)
In [4]: plot(traces.A.timestamps, traces.A.samples) # Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>] #
# In [5]: plot(traces.B.timestamps, traces.B.samples)
In [5]: plot(traces.B.timestamps, traces.B.samples) # Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
async def main(): async def main():

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO. Package for asynchronous serial IO.
""" """
# pylama:ignore=W1203,R0916,W0703
import asyncio import asyncio
import logging import logging
import sys import sys
@ -20,14 +22,14 @@ Log = logging.getLogger(__name__)
class SerialStream: class SerialStream:
@classmethod @classmethod
def devices_matching(cls, vid=None, pid=None, serial=None): def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports(): for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number): if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device yield port.device
@classmethod @classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs): def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial): for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs) return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device") raise RuntimeError("No matching serial device")
@ -59,15 +61,15 @@ class SerialStream:
return return
if not self._output_buffer: if not self._output_buffer:
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception: except Exception:
Log.exception("Error writing to stream") Log.exception("Error writing to stream")
raise raise
if n: if nbytes:
Log.debug(f"Write {data[:n]!r}") Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[n:] self._output_buffer = data[nbytes:]
else: else:
self._output_buffer += data self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None: if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self): def _feed_data(self):
try: try:
n = self._connection.write(self._output_buffer) nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception as e: except Exception as exc:
Log.exception("Error writing to stream") Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e) self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
if n: if nbytes:
Log.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer: if not self._output_buffer:
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None) self._output_buffer_empty.set_result(None)
@ -101,40 +103,39 @@ class SerialStream:
data = bytes(self._output_buffer) data = bytes(self._output_buffer)
self._output_buffer_lock.release() self._output_buffer_lock.release()
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
finally: finally:
self._output_buffer_lock.acquire() self._output_buffer_lock.acquire()
Log.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None self._output_buffer_empty = None
async def read(self, n=None): async def read(self, nbytes=None):
if self._use_threads: if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n) return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True: while True:
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w: if nwaiting:
data = self._connection.read(w if n is None else min(n, w)) data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
Log.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
else: future = self._loop.create_future()
future = self._loop.create_future() self._loop.add_reader(self._connection, future.set_result, None)
self._loop.add_reader(self._connection, future.set_result, None) try:
try: await future
await future finally:
finally: self._loop.remove_reader(self._connection)
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None): def _read_blocking(self, nbytes=None):
data = self._connection.read(1) data = self._connection.read(1)
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w and (n is None or n > 1): if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(w if n is None else min(n-1, w)) data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
Log.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
async def readexactly(self, n): async def readexactly(self, nbytes):
data = b'' data = b''
while len(data) < n: while len(data) < nbytes:
data += await self.read(n-len(data)) data += await self.read(nbytes-len(data))
return data return data

27
test.py Normal file
View File

@ -0,0 +1,27 @@
from pylab import figure, plot, show
from analysis import annotate_series
from scope import await_, capture, main
await_(main())
series = capture(['A'], period=20e-3, nsamples=2000).A
figure(1)
plot(series.timestamps, series.samples)
if annotate_series(series):
waveform = series.waveform
if 'duty_cycle' in waveform:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
else:
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
f"with amplitude ±{waveform.amplitude:.2f}V and offset {waveform.offset:.2f}V")
plot(waveform.timestamps + waveform.capture_start - series.capture_start,
waveform.samples * waveform.amplitude + waveform.offset)
show()

View File

@ -1,8 +1,12 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict): class DotDict(dict):
__getattr__ = dict.__getitem__ __getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__ __setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__ __delattr__ = dict.__delitem__

103
vm.py
View File

@ -1,4 +1,3 @@
""" """
vm vm
== ==
@ -7,12 +6,14 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data. commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B]. document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY [VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
""" """
# pylama:ignore=E221,C0326,R0904,W1203
import array import array
from collections import namedtuple from collections import namedtuple
from enum import IntEnum from enum import IntEnum
@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else: else:
width = int(self.dtype[1:]) width = int(self.dtype[1:])
if sign == 'U': if sign == 'U':
n = 1 << width max_value = (1 << width) - 1
value = max(0, min(value, n-1)) value = min(max(0, value), max_value)
bs = struct.pack('<I', value) data = struct.pack('<I', value)
elif sign == 'S': elif sign == 'S':
n = 1 << (width - 1) max_value = (1 << (width - 1))
value = max(-n, min(value, n-1)) value = min(max(-max_value, value), max_value - 1)
bs = struct.pack('<i', value) data = struct.pack('<i', value)
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
return bs[:width//8] return data[:width//8]
def decode(self, bs): def decode(self, data):
if len(bs) < 4: if len(data) < 4:
bs = bs + bytes(4 - len(bs)) data = data + bytes(4 - len(data))
sign = self.dtype[0] sign = self.dtype[0]
if sign == 'U': if sign == 'U':
value = struct.unpack('<I', bs)[0] value = struct.unpack('<I', data)[0]
elif sign == 'S': elif sign == 'S':
value = struct.unpack('<i', bs)[0] value = struct.unpack('<i', data)[0]
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
if '.' in self.dtype: if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1)) fraction = int(self.dtype.split('.', 1)[1])
value = value / (1 << fraction) value /= (1 << fraction)
return value return value
@property @property
@ -69,15 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0 whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S': if self.dtype[0] == 'S':
whole -= 1 whole -= 1
n = (1 << (whole+fraction)) - 1 max_value = (1 << (whole+fraction)) - 1
return n / (1 << fraction) if fraction else n return max_value / (1 << fraction) if fraction else max_value
@property @property
def width(self): def width(self):
if '.' in self.dtype: if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8 return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else: return int(self.dtype[1:]) // 8
return int(self.dtype[1:]) // 8
Registers = DotDict({ Registers = DotDict({
@ -90,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"), "TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"), "TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"), "TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"), "ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"), "ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"), "TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"), "TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"), "TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -151,13 +151,11 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"), "Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"), "Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"), "Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"), "PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"), "PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
}) })
# pylama:ignore=E221
class TraceMode(IntEnum): class TraceMode(IntEnum):
Analog = 0 Analog = 0
Mixed = 1 Mixed = 1
@ -254,12 +252,12 @@ class VirtualMachine:
class Transaction: class Transaction:
def __init__(self, vm): def __init__(self, vm):
self._vm = vm self._vm = vm
self._data = b''
def append(self, cmd): def append(self, cmd):
self._data += cmd self._data += cmd
async def __aenter__(self): async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self) self._vm._transactions.append(self)
return self return self
@ -281,6 +279,8 @@ class VirtualMachine:
self._writer.close() self._writer.close()
self._writer = None self._writer = None
self._reader = None self._reader = None
return True
return False
__del__ = close __del__ = close
@ -300,12 +300,12 @@ class VirtualMachine:
else: else:
self._transactions[-1].append(cmd) self._transactions[-1].append(cmd)
async def read_replies(self, n): async def read_replies(self, nreplies):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
replies = [] replies = []
data, self._reply_buffer = self._reply_buffer, b'' data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n: while len(replies) < nreplies:
index = data.find(b'\r') index = data.find(b'\r')
if index >= 0: if index >= 0:
reply = data[:index] reply = data[:index]
@ -331,25 +331,25 @@ class VirtualMachine:
async def set_registers(self, **kwargs): async def set_registers(self, **kwargs):
cmd = '' cmd = ''
r0 = r1 = None register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs): for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name] register = Registers[name]
bs = register.encode(kwargs[name]) data = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}") Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(bs): for i, byte in enumerate(data):
if cmd: if cmd:
cmd += 'z' cmd += 'z'
r1 += 1 register1 += 1
address = base + i address = base + i
if r1 is None or address > r1 + 3: if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@' cmd += f'{address:02x}@'
r0 = r1 = address register0 = register1 = address
else: else:
cmd += 'n' * (address - r1) cmd += 'n' * (address - register1)
r1 = address register1 = address
if byte != r0: if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}' cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte register0 = byte
if cmd: if cmd:
await self.issue(cmd + 's') await self.issue(cmd + 's')
@ -382,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self): async def issue_triggered_trace(self):
await self.issue(b'D') await self.issue(b'D')
async def read_analog_samples(self, n, sample_width): async def read_analog_samples(self, nsamples, sample_width):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
if sample_width == 2: if sample_width == 2:
data = await self._reader.readexactly(2*n) data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data))) return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1: if sample_width == 1:
data = await self._reader.readexactly(n) data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data)) return array.array('f', (value/256 for value in data))
else: raise ValueError(f"Bad sample width: {sample_width}")
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n): async def read_logic_samples(self, nsamples):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self): async def issue_cancel_trace(self):
await self.issue(b'K') await self.issue(b'K')
@ -411,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self): async def issue_wavetable_read(self):
await self.issue(b'R') await self.issue(b'R')
async def wavetable_read_bytes(self, n): async def wavetable_read_bytes(self, nbytes):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs): async def wavetable_write_bytes(self, data):
cmd = '' cmd = ''
last_byte = None last_byte = None
for byte in bs: for byte in data:
if byte != last_byte: if byte != last_byte:
cmd += f'{byte:02x}' cmd += f'{byte:02x}'
cmd += 'W' cmd += 'W'