mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-14 03:02:09 +01:00
Scaling using modelling function
This commit is contained in:
59
scope.py
59
scope.py
@ -1,11 +1,15 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import struct
|
||||
|
||||
import streams
|
||||
import vm
|
||||
|
||||
|
||||
Log = logging.getLogger('scope')
|
||||
|
||||
|
||||
class Scope(vm.VirtualMachine):
|
||||
|
||||
PARAMS_MAGIC = 0xb0b2
|
||||
@ -33,9 +37,8 @@ class Scope(vm.VirtualMachine):
|
||||
self.awg_sample_buffer_size = 1024
|
||||
self.awg_minimum_clock = 33
|
||||
self.awg_maximum_voltage = 3.3
|
||||
self.analog_low_ks = (0.43288780392308074, 0.060673410453476399, -0.0036026442646096687)
|
||||
self.analog_high_ks = (0.37518932193108517, -0.0036098646022277164, 0.06072914261381563)
|
||||
self.analog_offsets = {'A': -0.011675230981703359, 'B': 0.011675230981703359}
|
||||
self.analog_params = (18.584, -3.5073, 298.11, 18.253, 0.40815)
|
||||
self.analog_offsets = {'A': -0.011785, 'B': 0.011785}
|
||||
self.analog_min = -5.7
|
||||
self.analog_max = 8
|
||||
self.capture_clock_period = 25e-9
|
||||
@ -45,6 +48,7 @@ class Scope(vm.VirtualMachine):
|
||||
self.trigger_high = 10.816
|
||||
#await self.load_params() XXX switch this off until I understand EEPROM better
|
||||
self._generator_running = False
|
||||
Log.info("Initialised scope, revision: {}".format(revision))
|
||||
|
||||
async def load_params(self):
|
||||
params = []
|
||||
@ -63,6 +67,18 @@ class Scope(vm.VirtualMachine):
|
||||
for i, byte in enumerate(params):
|
||||
await self.write_eeprom(i+70, byte)
|
||||
|
||||
def calculate_lo_hi(self, low, high, params=None):
|
||||
if params is None:
|
||||
params = self.analog_params
|
||||
d, f, b, scale, offset = params
|
||||
l = low / scale + offset
|
||||
h = high / scale + offset
|
||||
al = d + f * (2*l - 1)**2
|
||||
ah = d + f * (2*h - 1)**2
|
||||
dl = (l*(2*al + b) - al*h) / b
|
||||
dh = (h*(2*ah + b) - ah*(l + 1)) / b
|
||||
return dl, dh
|
||||
|
||||
async def capture(self, channels=['A'], trigger_channel=None, trigger_level=0, trigger_type='rising', hair_trigger=False,
|
||||
period=1e-3, nsamples=1000, timeout=None, low=None, high=None, raw=False):
|
||||
if 'A' in channels and 'B' in channels:
|
||||
@ -118,10 +134,14 @@ class Scope(vm.VirtualMachine):
|
||||
total_samples = nsamples * nsamples_multiplier
|
||||
assert total_samples <= buffer_width
|
||||
|
||||
if raw:
|
||||
lo, hi = low, high
|
||||
else:
|
||||
if low is None:
|
||||
low = 0 if raw else self.analog_min
|
||||
low = self.analog_min
|
||||
if high is None:
|
||||
high = 1 if raw else self.analog_max
|
||||
high = self.analog_max
|
||||
lo, hi = self.calculate_lo_hi(low, high)
|
||||
|
||||
if trigger_channel is None:
|
||||
trigger_channel = channels[0]
|
||||
@ -154,8 +174,7 @@ class Scope(vm.VirtualMachine):
|
||||
Timeout=int(round((period*5 if timeout is None else timeout) / self.trigger_timeout_tick)),
|
||||
TriggerMask=0x7f, TriggerLogic=0x80, TriggerLevel=trigger_level, SpockOption=spock_option,
|
||||
TriggerIntro=trigger_intro, TriggerOutro=2 if hair_trigger else 4, Prelude=0,
|
||||
ConverterLo=low if raw else self._analog_map_func(self.analog_low_ks, low, high),
|
||||
ConverterHi=high if raw else self._analog_map_func(self.analog_high_ks, low, high),
|
||||
ConverterLo=lo, ConverterHi=hi,
|
||||
KitchenSinkA=kitchen_sink_a, KitchenSinkB=kitchen_sink_b, AnalogEnable=analog_enable)
|
||||
await self.issue_program_spock_registers()
|
||||
await self.issue_configure_device_hardware()
|
||||
@ -260,9 +279,10 @@ class Scope(vm.VirtualMachine):
|
||||
raise RuntimeError("Error writing EEPROM byte")
|
||||
|
||||
async def calibrate(self, n=33):
|
||||
global data
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from scipy.optimize import leastsq
|
||||
from scipy.optimize import leastsq, least_squares
|
||||
items = []
|
||||
await self.start_generator(1000, waveform='square')
|
||||
for low in np.linspace(0.063, 0.4, n):
|
||||
@ -282,17 +302,18 @@ class Scope(vm.VirtualMachine):
|
||||
items.append({'low': low, 'high': high, 'analog_low': analog_low, 'analog_high': analog_high, 'offset': ABoffset})
|
||||
await self.stop_generator()
|
||||
data = pd.DataFrame(items)
|
||||
analog_low_ks, success1 = leastsq(lambda ks, low, high, y: y - self._analog_map_func(ks, low, high), self.analog_low_ks,
|
||||
args=(data.analog_low, data.analog_high, data.low))
|
||||
analog_high_ks, success2 = leastsq(lambda ks, low, high, y: y - self._analog_map_func(ks, low, high), self.analog_high_ks,
|
||||
args=(data.analog_low, data.analog_high, data.high))
|
||||
if success1 in range(1, 5) and success2 in range(1, 5):
|
||||
self.analog_low_ks = tuple(analog_low_ks)
|
||||
self.analog_high_ks = tuple(analog_high_ks)
|
||||
def f(params, analog_low, analog_high, low, high):
|
||||
lo, hi = self.calculate_lo_hi(analog_low, analog_high, params)
|
||||
return np.sqrt((low - lo) ** 2 + (high - hi) ** 2)
|
||||
result = least_squares(f, self.analog_params, args=(data.analog_low, data.analog_high, data.low, data.high),
|
||||
bounds=([0, -np.inf, 250, 0, 0], [np.inf, np.inf, 350, np.inf, np.inf]))
|
||||
if result.success in range(1, 5):
|
||||
self.analog_params = tuple(result.x)
|
||||
self.analog_offsets = {'A': -data.offset.mean(), 'B': +data.offset.mean()}
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
Log.warning("Calibration failed: {}".format(result.message))
|
||||
print(result.message)
|
||||
return result.success
|
||||
|
||||
|
||||
import numpy as np
|
||||
@ -300,7 +321,7 @@ import pandas as pd
|
||||
|
||||
async def main():
|
||||
global s, x, y, data
|
||||
s = await Scope.connect()
|
||||
s = await Scope.connect(streams.SerialStream(device='/Users/jonathan/test'))
|
||||
x = np.linspace(0, 2*np.pi, s.awg_wavetable_size, endpoint=False)
|
||||
y = np.round((np.sin(x)**5)*127 + 128, 0).astype('uint8')
|
||||
await s.start_generator(1000, wavetable=y)
|
||||
@ -313,6 +334,6 @@ def capture(*args, **kwargs):
|
||||
if __name__ == '__main__':
|
||||
import logging
|
||||
import sys
|
||||
#logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
|
||||
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
|
||||
asyncio.get_event_loop().run_until_complete(main())
|
||||
|
||||
|
@ -13,10 +13,10 @@ class SerialStream:
|
||||
|
||||
@staticmethod
|
||||
def available_ports():
|
||||
return [port.device for port in serial.tools.list_ports.comports()]
|
||||
return [port.device for port in serial.tools.list_ports.comports() if port.usb_description() == 'FT245R USB FIFO']
|
||||
|
||||
def __init__(self, port=-1, loop=None, **kwargs):
|
||||
self._device = self.available_ports()[port]
|
||||
def __init__(self, port=0, device=None, loop=None, **kwargs):
|
||||
self._device = self.available_ports()[port] if device is None else device
|
||||
self._connection = serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
|
||||
self._loop = loop if loop is not None else asyncio.get_event_loop()
|
||||
self._input_buffer = bytes()
|
||||
|
Reference in New Issue
Block a user