#!/usr/bin/env python3 import argparse import asyncio from collections import namedtuple import logging import math import os import sys import streams import vm Log = logging.getLogger('scope') class DotDict(dict): __getattr__ = dict.__getitem__ __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ class Scope(vm.VirtualMachine): PARAMS_MAGIC = 0xb0b2 AnalogParams = namedtuple('AnalogParams', ['d', 'f', 'b', 'scale', 'offset']) @classmethod async def connect(cls, device=None): if device is None: reader = writer = streams.SerialStream.stream_matching(0x0403, 0x6001) elif os.path.exists(device): reader = writer = streams.SerialStream(device=device) elif ':' in device: host, port = device.split(':', 1) Log.info(f"Connecting to remote scope at {host}:{port}") reader, writer = await asyncio.open_connection(host, int(port)) else: raise ValueError(f"Don't know what to do with '{device}'") scope = cls(reader, writer) await scope.setup() return scope async def setup(self): Log.info("Resetting scope") await self.reset() await self.issue_get_revision() revision = ((await self.read_replies(2))[1]).decode('ascii') if revision == 'BS000501': self.awg_clock_period = 25e-9 self.awg_wavetable_size = 1024 self.awg_sample_buffer_size = 1024 self.awg_minimum_clock = 33 self.awg_maximum_voltage = 3.33 self.analog_params = self.AnalogParams(20, -5, 300, 18.36, -7.5) self.analog_offsets = {'A': 0, 'B': 0} self.analog_default_low = -5.5 self.analog_default_high = 8 self.analog_lo_min = 0.07 self.analog_hi_max = 0.88 self.capture_clock_period = 25e-9 self.capture_buffer_size = 12<<10 self.timeout_clock_period = 6.4e-6 else: raise RuntimeError(f"Unsupported scope, revision: {revision}") self._awg_running = False Log.info(f"Initialised scope, revision: {revision}") def close(self): if self._writer is not None: self._writer.close() self._writer = None self._reader = None __del__ = close def calculate_lo_hi(self, low, high, params=None): params = self.analog_params if params is None else self.AnalogParams(*params) l = (low - params.offset) / params.scale h = (high - params.offset) / params.scale al = params.d + params.f*(2*l-1)**2 ah = params.d + params.f*(2*h-1)**2 dl = l - al*(h-2*l)/params.b dh = h + ah*(2*h-l-1)/params.b return dl, dh async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False, period=1e-3, nsamples=1000, timeout=None, low=None, high=None, raw=False, trigger_position=0.25): analog_channels = set() logic_channels = set() for channel in channels: if channel in {'A', 'B'}: analog_channels.add(channel) if trigger is None: trigger = channel elif channel == 'L': logic_channels.update(range(8)) if trigger is None: trigger = {0: 1} elif channel.startswith('L'): i = int(channel[1:]) logic_channels.add(i) if trigger is None: trigger = {i: 1} else: raise ValueError(f"Unrecognised channel: {channel}") if self._awg_running and 4 in logic_channels: logic_channels.remove(4) if 'A' in analog_channels and 7 in logic_channels: logic_channels.remove(7) if 'B' in analog_channels and 6 in logic_channels: logic_channels.remove(6) analog_enable = 0 if 'A' in channels: analog_enable |= 1 if 'B' in channels: analog_enable |= 2 logic_enable = sum(1< capture_mode.clock_high: for clock_scale in range(2, 1<<16): test_ticks = int(round(period / nsamples / self.capture_clock_period / clock_scale)) if test_ticks in range(capture_mode.clock_low, capture_mode.clock_high + 1): ticks = test_ticks break else: continue else: continue if capture_mode.clock_max is not None and ticks > capture_mode.clock_max: ticks = capture_mode.clock_max nsamples = int(round(period / ticks / self.capture_clock_period / clock_scale)) if len(analog_channels) == 2: nsamples -= nsamples % 2 buffer_width = self.capture_buffer_size // capture_mode.sample_width if logic_channels and analog_channels: buffer_width //= 2 if nsamples <= buffer_width: break else: raise RuntimeError("Unable to find appropriate capture mode") if raw: lo, hi = low, high else: if low is None: low = self.analog_default_low if high is None: high = self.analog_default_high lo, hi = self.calculate_lo_hi(low, high) if lo < self.analog_lo_min or hi > self.analog_hi_max: Log.warning(f"Reference voltage DAC(s) out of safe range: lo={lo:.3f} hi={hi:.3f}") spock_option = vm.SpockOption.TriggerTypeHardwareComparator kitchen_sink_a = kitchen_sink_b = 0 if self._awg_running: kitchen_sink_b |= vm.KitchenSinkB.WaveformGeneratorEnable if trigger == 'A' or 7 in logic_channels: kitchen_sink_a |= vm.KitchenSinkA.ChannelAComparatorEnable if trigger == 'B' or 6 in logic_channels: kitchen_sink_a |= vm.KitchenSinkA.ChannelBComparatorEnable if analog_channels: kitchen_sink_b |= vm.KitchenSinkB.AnalogFilterEnable if trigger_level is None: trigger_level = (high + low) / 2 trigger_level = (trigger_level - self.analog_params.offset) / self.analog_params.scale if trigger == 'A' or trigger == 'B': if trigger == 'A': spock_option |= vm.SpockOption.TriggerSourceA trigger_logic = 0x80 elif trigger == 'B': spock_option |= vm.SpockOption.TriggerSourceB trigger_logic = 0x40 trigger_mask = 0xff ^ trigger_logic else: trigger_logic = 0 trigger_mask = 0xff for channel, value in trigger.items(): mask = 1< 0.05 and Bzero > 0.05 and Amax < 0.95 and Bmax < 0.95: zero = (Azero + Bzero) / 2 analog_range = self.awg_maximum_voltage / ((Amax + Bmax)/2 - zero) low = -zero * analog_range high = low + analog_range offset = ((Amax - Bmax) + (Azero - Bzero))/2 * analog_range items.append((low, high, lo, hi, offset)) i += 1 await self.stop_generator() items = np.array(items) def f(params, low, high, lo, hi): clo, chi = self.calculate_lo_hi(low, high, params) return np.sqrt((lo-clo)**2 + (hi-chi)**2) result = least_squares(f, self.analog_params, args=items.T[:4], max_nfev=1000, ftol=1e-9, xtol=1e-9, bounds=([0, -np.inf, 200, 0, -np.inf], [np.inf, np.inf, 400, np.inf, 0])) if result.success: Log.info(f"Calibration succeeded: {result.message}") params = self.analog_params = self.AnalogParams(*result.x) Log.info(f"Analog parameters: d={params.d:.1f}Ω f={params.f:.2f}Ω b={params.b:.1f}Ω scale={params.scale:.3f}V offset={params.offset:.3f}V") clow, chigh = self.calculate_lo_hi(items[:,0], items[:,1]) diff = (np.sqrt(((clow-items[:,2])**2).mean()) + np.sqrt(((chigh-items[:,3])**2).mean())) / (items[:,3]-items[:,2]).mean() Log.info(f"Mean error: {diff*10000:.1f}bps") offsets = items[:,4] offset = offsets.mean() Log.info(f"Mean A-B offset: {offset*1000:.1f}mV (+/- {100*offsets.std()/offset:.1f}%)") self.analog_offsets = {'A': -offset/2, 'B': +offset/2} else: Log.warning(f"Calibration failed: {result.message}") return result.success """ resistance$ ipython3 --pylab Using matplotlib backend: MacOSX In [1]: import pandas In [2]: run scope INFO:scope:Resetting scope INFO:scope:Initialised scope, revision: BS000501 In [3]: generate(2000, 'triangle') Out[3]: 2000.0 In [4]: t = pandas.DataFrame(capture(['A', 'B'], low=0, high=3.3)) In [5]: t.interpolate().plot() Out[5]: In [6]: t = pandas.DataFrame(capture(['L'], low=0, high=3.3)) In [7]: t.plot() Out[7]: In [8]: """ async def main(): global s parser = argparse.ArgumentParser(description="scopething") parser.add_argument('device', nargs='?', default=None, type=str, help="Device to connect to") parser.add_argument('--debug', action='store_true', default=False, help="Debug logging") args = parser.parse_args() logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, stream=sys.stdout) s = await Scope.connect(args.device) #await s.start_generator(2000, 'triangle') #import numpy as np #x = np.linspace(0, 2*np.pi, s.awg_wavetable_size, endpoint=False) #y = (np.sin(x)**5 + 1) / 2 #await s.start_generator(1000, wavetable=y) def await(g): return asyncio.get_event_loop().run_until_complete(g) def capture(*args, **kwargs): return await(s.capture(*args, **kwargs)) def capturep(*args, **kwargs): import pandas return pandas.DataFrame(capture(*args, **kwargs)) def calibrate(*args, **kwargs): return await(s.calibrate(*args, **kwargs)) def generate(*args, **kwargs): return await(s.start_generator(*args, **kwargs)) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())