1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 03:02:09 +01:00

Revised analog params; move VM-specific IO routines into vm.py; remove EEPROM params code (doesn't work); a few lines of documentation here and there

This commit is contained in:
Jonathan Hogg
2017-07-11 11:33:49 +01:00
parent 942c7e7dbc
commit be7e7e97c1
3 changed files with 80 additions and 52 deletions

View File

@ -2,10 +2,10 @@
import argparse
import asyncio
from collections import namedtuple
import logging
import math
import os
import struct
import sys
import streams
@ -25,6 +25,8 @@ class Scope(vm.VirtualMachine):
PARAMS_MAGIC = 0xb0b2
AnalogParams = namedtuple('AnalogParams', ['d', 'f', 'b', 'scale', 'offset'])
@classmethod
async def connect(cls, device=None):
if device is None:
@ -52,16 +54,15 @@ class Scope(vm.VirtualMachine):
self.awg_sample_buffer_size = 1024
self.awg_minimum_clock = 33
self.awg_maximum_voltage = 3.33
self.analog_params = (20.164, -5.2470, 299.00, 18.472, 0.40827)
self.analog_offsets = {'A': -0.011924, 'B': 0.011924}
self.analog_params = self.AnalogParams(20.2, -4.9, 300, 18.333, -7.517)
self.analog_offsets = {'A': 0, 'B': 0}
self.analog_min = -5.7
self.analog_max = 8
self.capture_clock_period = 25e-9
self.capture_buffer_size = 12<<10
self.timeout_clock_period = 6.4e-6
self.trigger_low = -7.517
self.trigger_high = 10.816
# await self.load_params() XXX switch this off until I understand EEPROM better
else:
raise RuntimeError(f"Unsupported scope revision: {revision}")
self._awg_running = False
Log.info(f"Initialised scope, revision: {revision}")
@ -73,32 +74,14 @@ class Scope(vm.VirtualMachine):
__del__ = close
async def load_params(self):
params = []
for i in range(struct.calcsize('<H8fH')):
params.append(await self.read_eeprom(i+70))
params = struct.unpack('<H8fH', bytes(params))
if params[0] == self.PARAMS_MAGIC and params[-1] == self.PARAMS_MAGIC:
self.analog_params = tuple(params[1:7])
self.analog_offsets['A'] = params[8]
self.analog_offsets['B'] = params[9]
async def save_params(self):
params = struct.pack('<H8fH', self.PARAMS_MAGIC, *self.analog_params,
self.analog_offsets['A'], self.analog_offsets['B'], self.PARAMS_MAGIC)
for i, byte in enumerate(params):
await self.write_eeprom(i+70, byte)
def calculate_lo_hi(self, low, high, params=None):
if params is None:
params = self.analog_params
d, f, b, scale, offset = params
l = low / scale + offset
h = high / scale + offset
al = d + f * (2*l - 1)**2
ah = d + f * (2*h - 1)**2
dl = (l*(2*al + b) - al*h) / b
dh = (h*(2*ah + b) - ah*(l + 1)) / b
params = self.analog_params if params is None else self.AnalogParams(*params)
l = (low - params.offset) / params.scale
h = (high - params.offset) / params.scale
al = params.d + params.f * (2*l-1)**2
ah = params.d + params.f * (2*h-1)**2
dl = l + (2*l-h)*al/params.b
dh = h + (2*h-l-1)*ah/params.b
return dl, dh
async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False,
@ -183,7 +166,7 @@ class Scope(vm.VirtualMachine):
kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable
if trigger_level is None:
trigger_level = (high + low) / 2
trigger_level = (trigger_level - self.trigger_low) / (self.trigger_high - self.trigger_low)
trigger_level = (trigger_level - self.analog_params.offset) / self.analog_params.scale
if trigger == 'A' or trigger == 'B':
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
@ -245,13 +228,9 @@ class Scope(vm.VirtualMachine):
DumpChan=dump_channel, DumpCount=asamples, DumpRepeat=1, DumpSend=1, DumpSkip=0)
await self.issue_program_spock_registers()
await self.issue_analog_dump_binary()
data = await self._reader.readexactly(asamples * capture_mode.sample_width)
value_multiplier, value_offset = (1, 0) if raw else ((high-low), low+self.analog_offsets[channel])
if capture_mode.sample_width == 2:
data = struct.unpack(f'>{asamples}h', data)
data = ((value/65536+0.5)*value_multiplier + value_offset for value in data)
else:
data = ((value/256)*value_multiplier + value_offset for value in data)
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
data = (value*value_multiplier + value_offset for value in data)
ts = (t*self.capture_clock_period for t in range(start_timestamp+dump_channel*ticks*clock_scale, timestamp,
ticks*clock_scale*len(analog_channels)))
traces[channel] = dict(zip(ts, data))
@ -262,7 +241,7 @@ class Scope(vm.VirtualMachine):
DumpMode=vm.DumpMode.Raw, DumpChan=128, DumpCount=nsamples, DumpRepeat=1, DumpSend=1, DumpSkip=0)
await self.issue_program_spock_registers()
await self.issue_analog_dump_binary()
data = await self._reader.readexactly(nsamples)
data = await self.read_logic_samples(nsamples)
ts = [t*self.capture_clock_period for t in range(start_timestamp, timestamp, ticks*clock_scale)]
for i in logic_channels:
mask = 1<<i
@ -326,7 +305,7 @@ class Scope(vm.VirtualMachine):
with self.transaction():
self.set_registers(Address=0, Size=self.awg_wavetable_size)
self.issue_wavetable_read()
return list(self.read_exactly(self.awg_wavetable_size))
return list(self.wavetable_read_bytes(self.awg_wavetable_size))
async def read_eeprom(self, address):
async with self.transaction():
@ -344,6 +323,7 @@ class Scope(vm.VirtualMachine):
async def calibrate(self, n=32):
import numpy as np
from scipy.optimize import least_squares
global items
items = []
await self.start_generator(frequency=1000, waveform='square')
i = 0
@ -362,7 +342,7 @@ class Scope(vm.VirtualMachine):
analog_range = self.awg_maximum_voltage / ((Amax + Bmax)/2 - zero)
analog_low = -zero * analog_range
analog_high = analog_low + analog_range
offset = (Azero - Bzero) / 2 * analog_range
offset = ((Amax - Bmax) + (Azero - Bzero))/2 * analog_range
items.append((analog_low, analog_high, low, high, offset))
i += 1
await self.stop_generator()
@ -370,11 +350,16 @@ class Scope(vm.VirtualMachine):
def f(params, analog_low, analog_high, low, high):
lo, hi = self.calculate_lo_hi(analog_low, analog_high, params)
return np.sqrt((low - lo) ** 2 + (high - hi) ** 2)
result = least_squares(f, self.analog_params, args=items.T[:4], bounds=([0, -np.inf, 250, 0, 0], [np.inf, np.inf, 350, np.inf, np.inf]))
result = least_squares(f, self.analog_params, args=items.T[:4], max_nfev=10000,
bounds=([0, -np.inf, 285, 17.4, -7.89], [np.inf, np.inf, 315, 19.2, -7.14]))
if result.success in range(1, 5):
self.analog_params = tuple(result.x)
offset = items[:,4].mean()
self.analog_offsets = {'A': -offset, 'B': +offset}
Log.info(f"Calibration succeeded: {result.success}")
self.analog_params = params = self.AnalogParams(*result.x)
Log.info(f"Analog parameters: d={params.d:.1f}Ω f={params.f:.2f}Ω b={params.b:.1f}Ω scale={params.scale:.3f}V offset={params.offset:.3f}V")
offsets = items[:,4]
offset = offsets.mean()
Log.info(f"Mean A-B offset is {offset*1000:.1f}mV (+/- {100*offsets.std()/offset:.1f}%)")
self.analog_offsets = {'A': -offset/2, 'B': +offset/2}
else:
Log.warning(f"Calibration failed: {result.message}")
return result.success