1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-13 18:52:10 +01:00

Linter fixes

This commit is contained in:
2020-06-27 14:26:33 +01:00
parent b6f4551208
commit 8c522205e6

View File

@ -7,7 +7,6 @@ from collections import namedtuple
from configparser import ConfigParser
import logging
import math
import os
from pathlib import Path
import sys
from urllib.parse import urlparse
@ -25,6 +24,7 @@ ANALOG_PARAMETERS_PATH = Path('~/.config/scopething/analog.conf').expanduser()
class UsageError(Exception):
pass
class ConfigurationError(Exception):
pass
@ -66,7 +66,7 @@ class Scope(vm.VirtualMachine):
if revision == 'BS000501':
self.master_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate
self.capture_buffer_size = 12<<10
self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024
self.awg_minimum_clock = 33
@ -75,8 +75,8 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07
self.analog_hi_max = 0.88
self.timeout_clock_period = (1<<8) * self.master_clock_period
self.timestamp_rollover = (1<<32) * self.master_clock_period
self.timeout_clock_period = (1 << 8) * self.master_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period
else:
raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False
@ -122,10 +122,10 @@ class Scope(vm.VirtualMachine):
def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams):
params = self.AnalogParams(*list(params) + [None]*(11-len(params)))
l = (low - params.offset) / params.scale
h = (high - params.offset) / params.scale
dl = params.la*l + params.lb*h + params.lc
dh = params.ha*h + params.hb*l + params.hc
lo = (low - params.offset) / params.scale
hi = (high - params.offset) / params.scale
dl = params.la*lo + params.lb*hi + params.lc
dh = params.ha*hi + params.hb*lo + params.hc
return dl, dh
async def capture(self, channels=['A'], trigger=None, trigger_level=None, trigger_type='rising', hair_trigger=False,
@ -157,8 +157,8 @@ class Scope(vm.VirtualMachine):
logic_channels.remove(7)
if 'B' in analog_channels and 6 in logic_channels:
logic_channels.remove(6)
analog_enable = sum(1<<(ord(channel)-ord('A')) for channel in analog_channels)
logic_enable = sum(1<<channel for channel in logic_channels)
analog_enable = sum(1 << (ord(channel)-ord('A')) for channel in analog_channels)
logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples))
@ -177,7 +177,7 @@ class Scope(vm.VirtualMachine):
ticks = capture_mode.clock_high
LOG.debug(f"- try with tick count {ticks}")
else:
LOG.debug(f"- mode too slow")
LOG.debug("- mode too slow")
continue
n = int(round(period / self.master_clock_period / ticks / clock_scale))
if len(analog_channels) == 2:
@ -195,9 +195,9 @@ class Scope(vm.VirtualMachine):
sample_period = ticks*clock_scale*self.master_clock_period
sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6:
LOG.warn(f"Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
LOG.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0
if raw:
analog_params = None
lo, hi = low, high
@ -246,7 +246,7 @@ class Scope(vm.VirtualMachine):
raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7:
raise ValueError("Unrecognised trigger value")
mask = 1<<channel
mask = 1 << channel
trigger_mask &= ~mask
if value:
trigger_logic |= mask
@ -266,7 +266,7 @@ class Scope(vm.VirtualMachine):
trigger_timeout = 0
else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period
+ timeout)/self.timeout_clock_period))
+ timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0:
raise ConfigurationError("Required trigger timeout too long")
@ -274,7 +274,7 @@ class Scope(vm.VirtualMachine):
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
LOG.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
async with self.transaction():
await self.set_registers(TraceMode=capture_mode.trace_mode, BufferMode=capture_mode.buffer_mode,
SampleAddress=0, ClockTicks=ticks, ClockScale=clock_scale,
@ -282,12 +282,11 @@ class Scope(vm.VirtualMachine):
TraceIntro=trace_intro, TraceOutro=trace_outro, TraceDelay=0, Timeout=trigger_timeout,
TriggerIntro=trigger_intro//2, TriggerOutro=trigger_outro//2, Prelude=0,
SpockOption=spock_option, ConverterLo=lo, ConverterHi=hi,
KitchenSinkA=kitchen_sink_a, KitchenSinkB=kitchen_sink_b,
KitchenSinkA=kitchen_sink_a, KitchenSinkB=kitchen_sink_b,
AnalogEnable=analog_enable, DigitalEnable=logic_enable)
await self.issue_program_spock_registers()
await self.issue_configure_device_hardware()
await self.issue_triggered_trace()
begin_timestamp = None
while True:
try:
code, timestamp = (int(x, 16) for x in await self.read_replies(2))
@ -298,20 +297,19 @@ class Scope(vm.VirtualMachine):
cause = {vm.TraceStatus.Done: 'trigger', vm.TraceStatus.Auto: 'timeout', vm.TraceStatus.Stop: 'cancel'}[code]
start_timestamp = timestamp - nsamples*ticks*clock_scale
if start_timestamp < 0:
start_timestamp += 1<<32
timestamp += 1<<32
start_timestamp += 1 << 32
timestamp += 1 << 32
address = int((await self.read_replies(1))[0], 16)
if capture_mode.analog_channels == 2:
address -= address % 2
traces = DotDict()
timestamps = array.array('d', (t*self.master_clock_period for t in range(start_timestamp, timestamp, ticks*clock_scale)))
start_time = start_timestamp*self.master_clock_period
for dump_channel, channel in enumerate(sorted(analog_channels)):
asamples = nsamples // len(analog_channels)
async with self.transaction():
await self.set_registers(SampleAddress=(address - nsamples) % buffer_width,
DumpMode=vm.DumpMode.Native if capture_mode.sample_width == 2 else vm.DumpMode.Raw,
DumpMode=vm.DumpMode.Native if capture_mode.sample_width == 2 else vm.DumpMode.Raw,
DumpChan=dump_channel, DumpCount=asamples, DumpRepeat=1, DumpSend=1, DumpSkip=0)
await self.issue_program_spock_registers()
await self.issue_analog_dump_binary()
@ -330,7 +328,7 @@ class Scope(vm.VirtualMachine):
await self.issue_analog_dump_binary()
data = await self.read_logic_samples(nsamples)
for i in logic_channels:
mask = 1<<i
mask = 1 << i
traces[f'L{i}'] = DotDict({'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period,
@ -433,7 +431,7 @@ class Scope(vm.VirtualMachine):
async def calibrate(self, probes='x1', n=32, save=True):
"""
Derive values for the analogue parameters based on generating a 3.3V 10kHz clock
Derive values for the analogue parameters based on generating a 3.3V 2kHz clock
signal and then sampling the analogue channels to measure this. The first step is
to set the low and high range DACs to 1/3 and 2/3, respectively. This results in
*neutral* voltages matching the three series 300Ω resistances created by the ADC
@ -458,19 +456,21 @@ class Scope(vm.VirtualMachine):
import numpy as np
from scipy.optimize import minimize
items = []
async def measure(lo, hi, period=2e-3, chop=True):
if chop:
traces = await self.capture(channels=['A','B'], period=period, nsamples=2000, timeout=0, low=lo, high=hi, raw=True)
traces = await self.capture(channels=['A', 'B'], period=period, nsamples=2000, timeout=0, low=lo, high=hi, raw=True)
A = np.array(traces.A.samples)
B = np.array(traces.B.samples)
else:
A = np.array((await self.capture(channels=['A'], period=period/2, nsamples=1000, timeout=0, low=lo, high=hi, raw=True)).A.samples)
B = np.array((await self.capture(channels=['B'], period=period/2, nsamples=1000, timeout=0, low=lo, high=hi, raw=True)).B.samples)
Amean = A.mean()
Azero, Afull = np.median(A[A<=Amean]), np.median(A[A>=Amean])
Azero, Afull = np.median(A[A <= Amean]), np.median(A[A >= Amean])
Bmean = B.mean()
Bzero, Bfull = np.median(B[B<=Bmean]), np.median(B[B>=Bmean])
Bzero, Bfull = np.median(B[B <= Bmean]), np.median(B[B >= Bmean])
return (Azero + Bzero) / 2, (Afull + Bfull) / 2, ((Afull - Bfull) + (Azero - Azero)) / 2
await self.start_clock(frequency=2000)
zero, full, offset = await measure(1/3, 2/3)
zero = (zero + 1) / 3
@ -480,27 +480,30 @@ class Scope(vm.VirtualMachine):
LOG.info(f"Analog full range = {analog_scale:.2f}V, zero offset = {analog_offset:.2f}V")
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n):
period = 2e-3 if len(items) % 4 < 2 else 1e-3
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero:
analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T
def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
return np.sqrt((lo-dl)**2 + (hi-dh)**2).mean()
start_params = self.analog_params.get(probes, [1,0,0,1,0,0])[:6]
start_params = self.analog_params.get(probes, [1, 0, 0, 1, 0, 0])[:6]
result = minimize(f, start_params, method='SLSQP',
bounds=[(1,np.inf), (-np.inf,0), (0,np.inf), (1,np.inf), (-np.inf,0), (-np.inf,0)],
bounds=[(1, np.inf), (-np.inf, 0), (0, np.inf), (1, np.inf), (-np.inf, 0), (-np.inf, 0)],
constraints=[{'type': 'eq', 'fun': lambda x: x[0]*1/3 + x[1]*2/3 + x[2] - 1/3},
{'type': 'eq', 'fun': lambda x: x[3]*2/3 + x[4]*1/3 + x[5] - 2/3}])
if result.success:
LOG.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x):
lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
safe_low, safe_high = minimize(f, (low[0], high[0])).x
offset_mean = offset.mean()
params = self.analog_params[probes] = self.AnalogParams(*result.x, analog_scale, analog_offset, safe_low, safe_high, offset_mean)
@ -537,6 +540,7 @@ In [5]: plot(traces.B.timestamps, traces.B.samples)
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
async def main():
global s
parser = argparse.ArgumentParser(description="scopething")
@ -547,6 +551,7 @@ async def main():
logging.basicConfig(level=logging.DEBUG if args.debug else (logging.INFO if args.verbose else logging.WARNING), stream=sys.stdout)
s = await Scope().connect(args.url)
def await_(g):
task = asyncio.Task(g)
while True:
@ -555,23 +560,28 @@ def await_(g):
except KeyboardInterrupt:
task.cancel()
def capture(*args, **kwargs):
return await_(s.capture(*args, **kwargs))
def capturep(*args, **kwargs):
import pandas
traces = capture(*args, **kwargs)
return pandas.DataFrame({channel: pandas.Series(trace.samples, trace.timestamps) for (channel,trace) in traces.items()})
return pandas.DataFrame({channel: pandas.Series(trace.samples, trace.timestamps) for (channel, trace) in traces.items()})
def calibrate(*args, **kwargs):
return await_(s.calibrate(*args, **kwargs))
def start_waveform(*args, **kwargs):
return await_(s.start_waveform(*args, **kwargs))
def start_clock(*args, **kwargs):
return await_(s.start_clock(*args, **kwargs))
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())