#!/usr/bin/env python3 import argparse import asyncio import logging import math import os import struct import sys import streams import vm Log = logging.getLogger('scope') class DotDict(dict): __getattr__ = dict.__getitem__ __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ class Scope(vm.VirtualMachine): PARAMS_MAGIC = 0xb0b2 @classmethod async def connect(cls, device=None): if device is None: reader = writer = streams.SerialStream.stream_matching(0x0403, 0x6001) elif os.path.exists(device): reader = writer = streams.SerialStream(device=device) elif ':' in device: host, port = device.split(':', 1) Log.info(f"Connecting to remote scope at {host}:{port}") reader, writer = await asyncio.open_connection(host, int(port)) else: raise ValueError(f"Don't know what to do with '{device}'") scope = cls(reader, writer) await scope.setup() return scope async def setup(self): Log.info("Resetting scope") await self.reset() await self.issue_get_revision() revision = ((await self.read_replies(2))[1]).decode('ascii') if revision == 'BS000501': self.awg_clock_period = 25e-9 self.awg_wavetable_size = 1024 self.awg_sample_buffer_size = 1024 self.awg_minimum_clock = 33 self.awg_maximum_voltage = 3.3 self.analog_params = (18.584, -3.5073, 298.11, 18.253, 0.40815) self.analog_offsets = {'A': -0.011785, 'B': 0.011785} self.analog_min = -5.7 self.analog_max = 8 self.capture_clock_period = 25e-9 self.capture_buffer_size = 12*1024 self.timeout_clock_period = 6.4e-6 self.trigger_low = -7.517 self.trigger_high = 10.816 # await self.load_params() XXX switch this off until I understand EEPROM better self._generator_running = False Log.info(f"Initialised scope, revision: {revision}") def close(self): if self._writer is not None: self._writer.close() self._writer = None self._reader = None __del__ = close async def load_params(self): params = [] for i in range(struct.calcsize(' capture_mode.clock_high: for clock_scale in range(2, 1<<16): test_ticks = int(period / nsamples / self.capture_clock_period / clock_scale) if test_ticks in range(capture_mode.clock_low, capture_mode.clock_high + 1): ticks = test_ticks break else: continue break else: raise RuntimeError("Unable to find appropriate capture mode") if capture_mode.clock_max is not None and ticks > capture_mode.clock_max: ticks = capture_mode.clock_max nsamples = int(round(period / ticks / self.capture_clock_period / clock_scale)) total_samples = nsamples*2 if logic and analog_channels else nsamples buffer_width = self.capture_buffer_size // capture_mode.sample_width if total_samples > buffer_width: raise RuntimeError("Capture buffer too small for requested capture") if raw: lo, hi = low, high else: if low is None: low = self.analog_min if high is None: high = self.analog_max lo, hi = self.calculate_lo_hi(low, high) analog_enable = 0 if 'A' in channels: analog_enable |= 1 if 'B' in channels: analog_enable |= 2 logic_enable = 0 for channel in range(8): if not ((channel == 4 and self._generator_running) or (channel == 6 and 'A' in channels) or (channel == 7 and 'B' in channels)): logic_enable |= 1<{asamples}h', data) data = [(value/65536+0.5)*value_multiplier + value_offset for value in data] else: data = [(value/256)*value_multiplier + value_offset for value in data] traces[channel] = {(t+dump_channel*ticks*clock_scale)*self.capture_clock_period: value for (t, value) in zip(range(start_timestamp, timestamp, ticks*clock_scale*len(analog_channels)), data)} if logic: async with self.transaction(): await self.set_registers(SampleAddress=(address - total_samples) % buffer_width, DumpMode=vm.DumpMode.Raw, DumpChan=128, DumpCount=nsamples, DumpRepeat=1, DumpSend=1, DumpSkip=0) await self.issue_program_spock_registers() await self.issue_analog_dump_binary() data = await self._reader.readexactly(nsamples) ts = [t*self.capture_clock_period for t in range(start_timestamp, timestamp, ticks*clock_scale)] for i in range(8): mask = 1< In [6]: """ async def main(): global s parser = argparse.ArgumentParser(description="scopething") parser.add_argument('device', nargs='?', default=None, type=str, help="Device to connect to") parser.add_argument('--debug', action='store_true', default=False, help="Debug logging") args = parser.parse_args() logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, stream=sys.stdout) s = await Scope.connect(args.device) #x = np.linspace(0, 2*np.pi, s.awg_wavetable_size, endpoint=False) #y = np.round((np.sin(x)**5)*127 + 128, 0).astype('uint8') #await s.start_generator(1000, wavetable=y) def await(g): return asyncio.get_event_loop().run_until_complete(g) def capture(*args, **kwargs): return await(s.capture(*args, **kwargs)) def calibrate(*args, **kwargs): return await(s.calibrate(*args, **kwargs)) def generate(*args, **kwargs): return await(s.start_generator(*args, **kwargs)) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())