mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-14 11:12:09 +01:00
New hi/lo model; improved timeout logic; new generator low/high logic; fixed scaling of samples
This commit is contained in:
89
scope.py
89
scope.py
@ -25,7 +25,7 @@ class Scope(vm.VirtualMachine):
|
|||||||
|
|
||||||
PARAMS_MAGIC = 0xb0b2
|
PARAMS_MAGIC = 0xb0b2
|
||||||
|
|
||||||
AnalogParams = namedtuple('AnalogParams', ['d', 'f', 'b', 'scale', 'offset'])
|
AnalogParams = namedtuple('AnalogParams', ['rd', 'rr', 'rt', 'rb', 'scale', 'offset'])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def connect(cls, device=None):
|
async def connect(cls, device=None):
|
||||||
@ -53,9 +53,9 @@ class Scope(vm.VirtualMachine):
|
|||||||
self.awg_wavetable_size = 1024
|
self.awg_wavetable_size = 1024
|
||||||
self.awg_sample_buffer_size = 1024
|
self.awg_sample_buffer_size = 1024
|
||||||
self.awg_minimum_clock = 33
|
self.awg_minimum_clock = 33
|
||||||
self.awg_maximum_voltage = 3.33
|
self.awg_maximum_voltage = 3.3
|
||||||
self.analog_params = self.AnalogParams(20, -5, 300, 18.36, -7.5)
|
self.analog_params = self.AnalogParams(20, 300, 335, 355, 18.5, -7.585)
|
||||||
self.analog_offsets = {'A': 0, 'B': 0}
|
self.analog_offsets = {'A': -9.5e-3, 'B': 9.5e-3}
|
||||||
self.analog_default_low = -5.5
|
self.analog_default_low = -5.5
|
||||||
self.analog_default_high = 8
|
self.analog_default_high = 8
|
||||||
self.analog_lo_min = 0.07
|
self.analog_lo_min = 0.07
|
||||||
@ -80,10 +80,8 @@ class Scope(vm.VirtualMachine):
|
|||||||
params = self.analog_params if params is None else self.AnalogParams(*params)
|
params = self.analog_params if params is None else self.AnalogParams(*params)
|
||||||
l = (low - params.offset) / params.scale
|
l = (low - params.offset) / params.scale
|
||||||
h = (high - params.offset) / params.scale
|
h = (high - params.offset) / params.scale
|
||||||
al = params.d + params.f*(2*l-1)**2
|
dl = l - params.rd*(h-l)/params.rr + params.rd*l/params.rb
|
||||||
ah = params.d + params.f*(2*h-1)**2
|
dh = h + params.rd*(h-l)/params.rr - params.rd*(1-h)/params.rt
|
||||||
dl = l - al*(h-2*l)/params.b
|
|
||||||
dh = h + ah*(2*h-l-1)/params.b
|
|
||||||
return dl, dh
|
return dl, dh
|
||||||
|
|
||||||
async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False,
|
async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False,
|
||||||
@ -152,11 +150,13 @@ class Scope(vm.VirtualMachine):
|
|||||||
else:
|
else:
|
||||||
if low is None:
|
if low is None:
|
||||||
low = self.analog_default_low
|
low = self.analog_default_low
|
||||||
|
elif low < self.analog_default_low:
|
||||||
|
Log.warning(f"Voltage range is below safe minimum: {low} < {self.analog_default_low}")
|
||||||
if high is None:
|
if high is None:
|
||||||
high = self.analog_default_high
|
high = self.analog_default_high
|
||||||
|
elif high > self.analog_default_high:
|
||||||
|
Log.warning(f"Voltage range is above safe maximum: {high} > {self.analog_default_high}")
|
||||||
lo, hi = self.calculate_lo_hi(low, high)
|
lo, hi = self.calculate_lo_hi(low, high)
|
||||||
if lo < self.analog_lo_min or hi > self.analog_hi_max:
|
|
||||||
Log.warning(f"Reference voltage DAC(s) out of safe range: lo={lo:.3f} hi={hi:.3f}")
|
|
||||||
|
|
||||||
spock_option = vm.SpockOption.TriggerTypeHardwareComparator
|
spock_option = vm.SpockOption.TriggerTypeHardwareComparator
|
||||||
kitchen_sink_a = kitchen_sink_b = 0
|
kitchen_sink_a = kitchen_sink_b = 0
|
||||||
@ -197,7 +197,7 @@ class Scope(vm.VirtualMachine):
|
|||||||
if timeout is None:
|
if timeout is None:
|
||||||
trigger_timeout = 0
|
trigger_timeout = 0
|
||||||
else:
|
else:
|
||||||
trigger_timeout = max(1, int(math.ceil(((trigger_outro+trace_outro)*ticks*clock_scale*self.capture_clock_period
|
trigger_timeout = max(1, int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.capture_clock_period
|
||||||
+ timeout)/self.timeout_clock_period)))
|
+ timeout)/self.timeout_clock_period)))
|
||||||
|
|
||||||
async with self.transaction():
|
async with self.transaction():
|
||||||
@ -253,10 +253,14 @@ class Scope(vm.VirtualMachine):
|
|||||||
|
|
||||||
return traces
|
return traces
|
||||||
|
|
||||||
async def start_generator(self, frequency, waveform='sine', wavetable=None, ratio=0.5, vpp=None, offset=0,
|
async def start_generator(self, frequency, waveform='sine', wavetable=None, ratio=0.5,
|
||||||
min_samples=50, max_error=1e-4):
|
low=0, high=None, min_samples=50, max_error=1e-4):
|
||||||
if vpp is None:
|
if high is None:
|
||||||
vpp = self.awg_maximum_voltage
|
high = self.awg_maximum_voltage
|
||||||
|
elif high < 0 or high > self.awg_maximum_voltage:
|
||||||
|
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
|
||||||
|
if low < 0 or low > high:
|
||||||
|
raise ValueError("offset out of range (0-high)")
|
||||||
possible_params = []
|
possible_params = []
|
||||||
max_clock = int(math.floor(1 / frequency / min_samples / self.awg_clock_period))
|
max_clock = int(math.floor(1 / frequency / min_samples / self.awg_clock_period))
|
||||||
for clock in range(self.awg_minimum_clock, max_clock+1):
|
for clock in range(self.awg_minimum_clock, max_clock+1):
|
||||||
@ -283,7 +287,8 @@ class Scope(vm.VirtualMachine):
|
|||||||
raise ValueError(f"Wavetable data must be {self.awg_wavetable_size} samples")
|
raise ValueError(f"Wavetable data must be {self.awg_wavetable_size} samples")
|
||||||
await self.set_registers(Cmd=0, Mode=1, Address=0, Size=1)
|
await self.set_registers(Cmd=0, Mode=1, Address=0, Size=1)
|
||||||
await self.wavetable_write_bytes(wavetable)
|
await self.wavetable_write_bytes(wavetable)
|
||||||
await self.set_registers(Cmd=0, Mode=0, Level=vpp/self.awg_maximum_voltage,
|
offset = (high+low)/2 - self.awg_maximum_voltage/2
|
||||||
|
await self.set_registers(Cmd=0, Mode=0, Level=(high-low)/self.awg_maximum_voltage,
|
||||||
Offset=offset/self.awg_maximum_voltage,
|
Offset=offset/self.awg_maximum_voltage,
|
||||||
Ratio=nwaves*self.awg_wavetable_size/size,
|
Ratio=nwaves*self.awg_wavetable_size/size,
|
||||||
Index=0, Address=0, Size=size)
|
Index=0, Address=0, Size=size)
|
||||||
@ -329,42 +334,44 @@ class Scope(vm.VirtualMachine):
|
|||||||
from scipy.optimize import least_squares
|
from scipy.optimize import least_squares
|
||||||
items = []
|
items = []
|
||||||
await self.start_generator(frequency=1000, waveform='square')
|
await self.start_generator(frequency=1000, waveform='square')
|
||||||
i = 0
|
|
||||||
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
|
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
|
||||||
for hi in np.linspace(0.5, self.analog_hi_max, n, endpoint=False):
|
for hi in np.linspace(0.5, self.analog_hi_max, n, endpoint=False):
|
||||||
data = await self.capture(channels=['A','B'], period=2e-3 if i%2 == 0 else 1e-3, nsamples=2000, low=lo, high=hi, timeout=0, raw=True)
|
data = await self.capture(channels=['A','B'], period=2e-3, nsamples=2000, timeout=0, low=lo, high=hi, raw=True)
|
||||||
A = np.fromiter(data['A'].values(), dtype='float')
|
A = np.fromiter(data['A'].values(), count=1000, dtype='float')
|
||||||
A.sort()
|
A.sort()
|
||||||
B = np.fromiter(data['B'].values(), dtype='float')
|
|
||||||
B.sort()
|
|
||||||
Azero, Amax = A[25:475].mean(), A[525:975].mean()
|
Azero, Amax = A[25:475].mean(), A[525:975].mean()
|
||||||
|
if Azero < 0.01 or Amax > 0.99:
|
||||||
|
continue
|
||||||
|
B = np.fromiter(data['B'].values(), count=1000, dtype='float')
|
||||||
|
B.sort()
|
||||||
Bzero, Bmax = B[25:475].mean(), B[525:975].mean()
|
Bzero, Bmax = B[25:475].mean(), B[525:975].mean()
|
||||||
if Azero > 0.05 and Bzero > 0.05 and Amax < 0.95 and Bmax < 0.95:
|
if Bzero < 0.01 or Bmax > 0.99:
|
||||||
zero = (Azero + Bzero) / 2
|
continue
|
||||||
analog_range = self.awg_maximum_voltage / ((Amax + Bmax)/2 - zero)
|
zero = (Azero + Bzero) / 2
|
||||||
low = -zero * analog_range
|
analog_range = self.awg_maximum_voltage / ((Amax + Bmax)/2 - zero)
|
||||||
high = low + analog_range
|
low = -zero * analog_range
|
||||||
offset = ((Amax - Bmax) + (Azero - Bzero))/2 * analog_range
|
high = low + analog_range
|
||||||
items.append((low, high, lo, hi, offset))
|
offset = ((Amax - Bmax) + (Azero - Bzero))/2 * analog_range
|
||||||
i += 1
|
items.append((lo, hi, low, high, offset))
|
||||||
await self.stop_generator()
|
await self.stop_generator()
|
||||||
items = np.array(items)
|
items = np.array(items).T
|
||||||
def f(params, low, high, lo, hi):
|
def f(params, lo, hi, low, high, offset):
|
||||||
clo, chi = self.calculate_lo_hi(low, high, params)
|
clo, chi = self.calculate_lo_hi(low, high, params)
|
||||||
return np.sqrt((lo-clo)**2 + (hi-chi)**2)
|
return np.sqrt((lo-clo)**2 + (hi-chi)**2)
|
||||||
result = least_squares(f, self.analog_params, args=items.T[:4], max_nfev=1000, ftol=1e-9, xtol=1e-9,
|
result = least_squares(f, self.analog_params, args=items, bounds=([0, 200, 200, 200, 18, -8], [50, 400, 400, 400, 19, -7]))
|
||||||
bounds=([0, -np.inf, 200, 0, -np.inf], [np.inf, np.inf, 400, np.inf, 0]))
|
|
||||||
if result.success:
|
if result.success:
|
||||||
Log.info(f"Calibration succeeded: {result.message}")
|
Log.info(f"Calibration succeeded: {result.message}")
|
||||||
params = self.analog_params = self.AnalogParams(*result.x)
|
params = self.analog_params = self.AnalogParams(*result.x)
|
||||||
Log.info(f"Analog parameters: d={params.d:.1f}Ω f={params.f:.2f}Ω b={params.b:.1f}Ω scale={params.scale:.3f}V offset={params.offset:.3f}V")
|
Log.info(f"Analog parameters: rd={params.rd:.1f}Ω rr={params.rr:.1f}Ω rt={params.rt:.1f} rb={params.rb:.1f}"
|
||||||
clow, chigh = self.calculate_lo_hi(items[:,0], items[:,1])
|
f" scale={params.scale:.3f}V offset={params.offset:.3f}V")
|
||||||
diff = (np.sqrt(((clow-items[:,2])**2).mean()) + np.sqrt(((chigh-items[:,3])**2).mean())) / (items[:,3]-items[:,2]).mean()
|
lo, hi, low, high, offset = items
|
||||||
Log.info(f"Mean error: {diff*10000:.1f}bps")
|
clo, chi = self.calculate_lo_hi(low, high)
|
||||||
offsets = items[:,4]
|
lo_error = np.sqrt((((clo-lo)/(hi-lo))**2).mean())
|
||||||
offset = offsets.mean()
|
hi_error = np.sqrt((((chi-hi)/(hi-lo))**2).mean())
|
||||||
Log.info(f"Mean A-B offset: {offset*1000:.1f}mV (+/- {100*offsets.std()/offset:.1f}%)")
|
Log.info(f"Mean error: lo={lo_error*10000:.1f}bps hi={hi_error*10000:.1f}bps")
|
||||||
self.analog_offsets = {'A': -offset/2, 'B': +offset/2}
|
offset_mean = offset.mean()
|
||||||
|
Log.info(f"Mean A-B offset: {offset_mean*1000:.1f}mV (+/- {100*offset.std()/offset_mean:.1f}%)")
|
||||||
|
self.analog_offsets = {'A': -offset_mean/2, 'B': +offset_mean/2}
|
||||||
else:
|
else:
|
||||||
Log.warning(f"Calibration failed: {result.message}")
|
Log.warning(f"Calibration failed: {result.message}")
|
||||||
return result.success
|
return result.success
|
||||||
|
4
vm.py
4
vm.py
@ -339,10 +339,10 @@ class VirtualMachine:
|
|||||||
if sample_width == 2:
|
if sample_width == 2:
|
||||||
data = await self._reader.readexactly(2 * n)
|
data = await self._reader.readexactly(2 * n)
|
||||||
data = struct.unpack(f'>{n}h', data)
|
data = struct.unpack(f'>{n}h', data)
|
||||||
return [value/65536 + 0.5 for value in data]
|
return [(value+32768)/65535 for value in data]
|
||||||
elif sample_width == 1:
|
elif sample_width == 1:
|
||||||
data = await self._reader.readexactly(n)
|
data = await self._reader.readexactly(n)
|
||||||
return [value/256 for value in data]
|
return [value/255 for value in data]
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Bad sample width: {sample_width}")
|
raise ValueError(f"Bad sample width: {sample_width}")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user