1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 03:02:09 +01:00

Load/save analog parameters to a config file; more debug logging; quicker clock divider determination in capturing; quicker selection of clock for waveform generation; documented calibration; removed dodgy/unused scope methods; fixed sample scaling; fixes for using scope over network connection; new URL-based connection method

This commit is contained in:
2018-07-03 18:54:36 +01:00
parent bbc7596292
commit 352e3e65f5
3 changed files with 150 additions and 100 deletions

194
scope.py
View File

@ -4,16 +4,21 @@ import argparse
import array
import asyncio
from collections import namedtuple
from configparser import ConfigParser
import logging
import math
import os
from pathlib import Path
import sys
from urllib.parse import urlparse
import streams
import vm
LOG = logging.getLogger('scope')
LOG = logging.getLogger(__name__)
ANALOG_PARAMETERS_PATH = Path('~/.config/scopething/analog.conf').expanduser()
class UsageError(Exception):
@ -28,36 +33,39 @@ class Scope(vm.VirtualMachine):
AnalogParams = namedtuple('AnalogParams', ['la', 'lb', 'lc', 'ha', 'hb', 'hc', 'scale', 'offset', 'safe_low', 'safe_high', 'ab_offset'])
@classmethod
async def connect(cls, device=None):
if device is None:
reader = writer = streams.SerialStream.stream_matching(0x0403, 0x6001)
elif os.path.exists(device):
reader = writer = streams.SerialStream(device=device)
elif ':' in device:
host, port = device.split(':', 1)
LOG.info(f"Connecting to remote scope at {host}:{port}")
async def connect(cls, url=None):
if url is None:
port = next(streams.SerialStream.ports_matching(vid=0x0403, pid=0x6001))
url = f'file:{port.device}'
LOG.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file')
if parts.scheme == 'file':
reader = writer = streams.SerialStream(device=parts.path)
elif parts.scheme == 'socket':
host, port = parts.netloc.split(':', 1)
reader, writer = await asyncio.open_connection(host, int(port))
else:
raise ValueError(f"Don't know what to do with {device!r}")
raise ValueError(f"Don't know what to do with url: {url}")
scope = cls(reader, writer)
await scope.setup()
scope.url = url
await scope.reset()
return scope
async def setup(self):
async def reset(self):
LOG.info("Resetting scope")
await self.reset()
await self.issue_reset()
await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501':
self.master_clock_period = 25e-9
self.master_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate
self.capture_buffer_size = 12<<10
self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024
self.awg_minimum_clock = 33
self.logic_low = 0
self.awg_maximum_voltage = self.clock_voltage = self.logic_high = 3.3
self.analog_params = {'x1': self.AnalogParams(1.11, -6.57e-2, 8.46e-3, 1.11, -7.32e-2, -5.19e-2, 18.28, -7.45, -5.5, 8, 5.3e-3),
'x10': self.AnalogParams(1.10, -6.11e-2, 8.61e-3, 1.10, -6.68e-2, -4.32e-2, 184.3, -90.4, -71.3, 65.7, 175e-3)}
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.3, -7.50, -5.5, 8, 0)}
self.analog_lo_min = 0.07
self.analog_hi_max = 0.88
self.timeout_clock_period = (1<<8) * self.master_clock_period
@ -66,8 +74,32 @@ class Scope(vm.VirtualMachine):
raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False
self._clock_running = False
self.load_analog_params()
LOG.info(f"Initialised scope, revision: {revision}")
def load_analog_params(self):
config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH)
analog_params = {}
for url in config.sections():
if url == self.url:
for probes in config[url]:
analog_params[probes] = self.AnalogParams(*map(float, config[url][probes].split()))
if analog_params:
self.analog_params.update(analog_params)
LOG.info(f"Loaded analog parameters for probes: {', '.join(analog_params.keys())}")
def save_analog_params(self):
LOG.info("Saving analog parameters")
config = ConfigParser()
config.read(ANALOG_PARAMETERS_PATH)
config[self.url] = {probes: ' '.join(map(str, self.analog_params[probes])) for probes in self.analog_params}
parent = ANALOG_PARAMETERS_PATH.parent
if not parent.is_dir():
parent.mkdir(parents=True)
with open(ANALOG_PARAMETERS_PATH, 'w') as parameters_file:
config.write(parameters_file)
def __enter__(self):
return self
@ -119,40 +151,49 @@ class Scope(vm.VirtualMachine):
analog_enable = sum(1<<(ord(channel)-ord('A')) for channel in analog_channels)
logic_enable = sum(1<<channel for channel in logic_channels)
ticks = int(round(period / nsamples / self.master_clock_period))
for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples))
clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
if ticks > capture_mode.clock_high and capture_mode.clock_divide:
for clock_scale in range(2, vm.Registers.ClockScale.maximum_value+1):
test_ticks = int(round(period / nsamples / self.master_clock_period / clock_scale))
if test_ticks in range(capture_mode.clock_low, capture_mode.clock_high + 1):
ticks = test_ticks
break
LOG.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1):
LOG.debug(f"- try with tick count {ticks} x {clock_scale}")
else:
continue
break
elif ticks >= capture_mode.clock_low:
clock_scale = 1
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
LOG.debug(f"- try with tick count {ticks}")
else:
LOG.debug(f"- mode too slow")
continue
n = int(round(period / ticks / self.master_clock_period / clock_scale))
n = int(round(period / self.master_clock_period / ticks / clock_scale))
if len(analog_channels) == 2:
n -= n % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels:
buffer_width //= 2
if n <= buffer_width:
LOG.debug(f"- OK; period is {n} samples")
nsamples = n
break
LOG.debug(f"- insufficient buffer space for necessary {n} samples")
else:
raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period
sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6:
LOG.warn(f"Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0
analog_params = self.analog_params[probes]
if raw:
analog_params = None
lo, hi = low, high
else:
analog_params = self.analog_params[probes]
if low is None:
low = analog_params.safe_low if analog_channels else self.logic_low
elif low < analog_params.safe_low:
@ -175,7 +216,8 @@ class Scope(vm.VirtualMachine):
kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable
if trigger_level is None:
trigger_level = (high + low) / 2
trigger_level = (trigger_level - analog_params.offset) / analog_params.scale
if not raw:
trigger_level = (trigger_level - analog_params.offset) / analog_params.scale
if trigger == 'A' or trigger == 'B':
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
@ -219,8 +261,6 @@ class Scope(vm.VirtualMachine):
else:
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
sample_period = ticks*clock_scale*self.master_clock_period
sample_rate = 1/sample_period
LOG.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
async with self.transaction():
@ -235,6 +275,7 @@ class Scope(vm.VirtualMachine):
await self.issue_program_spock_registers()
await self.issue_configure_device_hardware()
await self.issue_triggered_trace()
begin_timestamp = None
while True:
try:
code, timestamp = (int(x, 16) for x in await self.read_replies(2))
@ -297,32 +338,36 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high:
raise ValueError("low out of range (0-high)")
possible_params = []
max_clock = int(math.floor(1 / frequency / min_samples / self.master_clock_period))
for clock in range(self.awg_minimum_clock, max_clock+1):
width = 1 / frequency / (clock * self.master_clock_period)
if width <= self.awg_sample_buffer_size:
nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width))
width = size / nwaves
actualf = 1 / (width * clock * self.master_clock_period)
error = abs(frequency - actualf) / frequency
if error < max_error:
possible_params.append((width if error == 0 else -error, (size, nwaves, clock, actualf)))
if not possible_params:
raise ConfigurationError("No solution to required frequency/min_samples/max_error")
size, nwaves, clock, actualf = sorted(possible_params)[-1][1]
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None
for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock
if actualf == frequency:
LOG.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break
error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]):
best_solution = error, size, nwaves, clock, actualf
else:
if best_solution is None:
raise ConfigurationError("No solution to required frequency/min_samples/max_error")
error, size, nwaves, clock, actualf = best_solution
LOG.debug(f"Best solution: size={size} nwaves={nwaves} clock={clock} actualf={actualf}")
async with self.transaction():
if isinstance(waveform, str):
mode = {'sine': 0, 'triangle': 1, 'exponential': 2, 'square': 3}[waveform.lower()]
await self.set_registers(Cmd=0, Mode=mode, Ratio=ratio)
await self.issue_synthesize_wavetable()
elif len(wavetable) == self.awg_wavetable_size:
wavetable = bytes(min(max(0, int(round(y*255))),255) for y in wavetable)
wavetable = bytes(min(max(0, int(round(y*256))), 255) for y in wavetable)
await self.set_registers(Cmd=0, Mode=1, Address=0, Size=1)
await self.wavetable_write_bytes(wavetable)
else:
raise ValueError(f"waveform must be a valid name or {self.awg_wavetable_size} samples")
raise ValueError(f"waveform must be a valid name or a sequence of {self.awg_wavetable_size} samples [0,1)")
async with self.transaction():
offset = (high+low)/2 - self.awg_maximum_voltage/2
await self.set_registers(Cmd=0, Mode=0, Level=(high-low)/self.awg_maximum_voltage,
@ -355,9 +400,9 @@ class Scope(vm.VirtualMachine):
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(1 / frequency / self.master_clock_period))), vm.Registers.Clock.maximum_value)
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = 1 / ticks / self.master_clock_period, fall / ticks
actualf, actualr = self.master_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction():
@ -376,26 +421,30 @@ class Scope(vm.VirtualMachine):
LOG.info("Clock generator stopped")
self._clock_running = False
async def read_wavetable(self):
with self.transaction():
self.set_registers(Address=0, Size=self.awg_wavetable_size)
self.issue_wavetable_read()
return list(self.wavetable_read_bytes(self.awg_wavetable_size))
async def calibrate(self, probes='x1', n=32, save=True):
"""
Derive values for the analogue parameters based on generating a 3.3V 10kHz clock
signal and then sampling the analogue channels to measure this. The first step is
to set the low and high range DACs to 1/3 and 2/3, respectively. This results in
*neutral* voltages matching the three series 300Ω resistances created by the ADC
ladder resistance and the upper and lower bias resistors. Thus no current should
be flowing in or out of the DACs and their effect on the ADC range voltages can
be ignored. This allows an initial measurement to determine the full analogue
range and zero offset.
async def read_eeprom(self, address):
async with self.transaction():
await self.set_registers(EepromAddress=address)
await self.issue_read_eeprom()
return int((await self.read_replies(2))[1], 16)
After this initial measurement, an `n`x`n` matrix of measurements are taken with
different `lo` and `hi` DAC input values and these are used, with the known clock
voltage, to reverse out the actual `low` and `high` measurement voltage range.
The full set of measurements are then fed into the SciPy SLSQP minimiser to find
parameters for two plane functions mapping the `low` and `high` voltages to the
necessary `lo` and `hi` DAC values to achieve these. (Note that these functions
are constrained to ensure that they pass through the *neutral* points.
async def write_eeprom(self, address, byte):
async with self.transaction():
await self.set_registers(EepromAddress=address, EepromData=byte)
await self.issue_write_eeprom()
if int((await self.read_replies(2))[1], 16) != byte:
raise RuntimeError("Error writing EEPROM byte")
async def calibrate(self, probes='x1', n=32):
A further minimisation step is done to determine the safe analogue range based
on the observed linear range of the DACs (`self.analog_lo_min` to
`self.analog_hi_max`). The mean of the measured offsets between the A and B
channel readings are used to determine an AB offset.
"""
import numpy as np
from scipy.optimize import minimize
items = []
@ -418,7 +467,7 @@ class Scope(vm.VirtualMachine):
full = (full + 1) / 3
analog_scale = self.clock_voltage / (full - zero)
analog_offset = -zero * analog_scale
LOG.info(f"Analog full range = {analog_scale:.1f}V, zero offset = {analog_offset:.1f}V")
LOG.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V")
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n):
period = 2e-3 if len(items) % 4 < 2 else 1e-3
@ -427,8 +476,7 @@ class Scope(vm.VirtualMachine):
analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock()
items = np.array(items).T
lo, hi, low, high, offset = items
lo, hi, low, high, offset = np.array(items).T
def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
return np.sqrt((lo-dl)**2 + (hi-dh)**2).mean()
@ -455,6 +503,8 @@ class Scope(vm.VirtualMachine):
lo_error = np.sqrt((((clo-lo)/(hi-lo))**2).mean())
hi_error = np.sqrt((((chi-hi)/(hi-lo))**2).mean())
LOG.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps")
if save:
self.save_analog_params()
else:
LOG.warning(f"Calibration failed: {result.message}")
return result.success
@ -485,12 +535,12 @@ In [6]:
async def main():
global s
parser = argparse.ArgumentParser(description="scopething")
parser.add_argument('device', nargs='?', default=None, type=str, help="Device to connect to")
parser.add_argument('url', nargs='?', default=None, type=str, help="Device to connect to")
parser.add_argument('--debug', action='store_true', default=False, help="Debug logging")
parser.add_argument('--verbose', action='store_true', default=False, help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.debug else (logging.INFO if args.verbose else logging.WARNING), stream=sys.stdout)
s = await Scope.connect(args.device)
s = await Scope.connect(args.url)
def await_(g):
task = asyncio.Task(g)