diff --git a/analysis.py b/analysis.py index d6e3518..6e2b9c8 100644 --- a/analysis.py +++ b/analysis.py @@ -1,3 +1,11 @@ +""" +analysis +======== + +Library code for analysing captures returned by `Scope.capture()`. +""" + +# pylama:ignore=C0103,R1716 import numpy as np @@ -62,7 +70,7 @@ def extract_waveform(series, period): p = int(round(series.sample_rate * period)) n = len(series.samples) // p if n <= 2: - return None, None + return None, None, None, None samples = np.array(series.samples)[:p*n] cumsum = samples.cumsum() underlying = (cumsum[p:] - cumsum[:-p]) / p @@ -94,7 +102,7 @@ def normalize_waveform(samples, smooth=7): crossings.append((i - last_rising, last_rising)) if first_falling is not None: crossings.append((n + first_falling - last_rising, last_rising)) - width, first = min(crossings) + first = min(crossings)[1] wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings) @@ -104,7 +112,7 @@ def characterize_waveform(samples, crossings): possibles = [] if len(crossings) == 1: duty_cycle = crossings[0][1] / n - if duty_cycle > 0.45 and duty_cycle < 0.55: + if 0.45 < duty_cycle < 0.55: possibles.append((rms(samples - sine_wave(n)), 'sine', None)) possibles.append((rms(samples - triangle_wave(n)), 'triangle', None)) possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None)) diff --git a/scope.py b/scope.py index e6848bf..2cd4b9e 100755 --- a/scope.py +++ b/scope.py @@ -1,4 +1,13 @@ -#!/usr/bin/env python3 +""" +scope +===== + +Code for talking to the BitScope series of USB digital mixed-signal scopes. +Only supports the BS000501 at the moment, but that's only because it's never +been tested on any other model. +""" + +# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102 import argparse import array @@ -178,23 +187,23 @@ class Scope(vm.VirtualMachine): else: Log.debug("- mode too slow") continue - n = int(round(period / self.master_clock_period / ticks / clock_scale)) + actual_nsamples = int(round(period / self.master_clock_period / ticks / clock_scale)) if len(analog_channels) == 2: - n -= n % 2 + actual_nsamples -= actual_nsamples % 2 buffer_width = self.capture_buffer_size // capture_mode.sample_width if logic_channels and analog_channels: buffer_width //= 2 - if n <= buffer_width: - Log.debug(f"- OK; period is {n} samples") - nsamples = n + if actual_nsamples <= buffer_width: + Log.debug(f"- OK; period is {actual_nsamples} samples") + nsamples = actual_nsamples break - Log.debug(f"- insufficient buffer space for necessary {n} samples") + Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples") else: raise ConfigurationError("Unable to find appropriate capture mode") sample_period = ticks*clock_scale*self.master_clock_period sample_rate = 1/sample_period if trigger_position and sample_rate > 5e6: - Log.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0") + Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0") trigger_position = 0 if raw: @@ -225,7 +234,7 @@ class Scope(vm.VirtualMachine): if trigger_level is None: trigger_level = (high + low) / 2 analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level - if trigger == 'A' or trigger == 'B': + if trigger in {'A', 'B'}: if trigger == 'A': spock_option |= vm.SpockOption.TriggerSourceA trigger_logic = 0x80 @@ -239,7 +248,7 @@ class Scope(vm.VirtualMachine): for channel, value in trigger.items(): if isinstance(channel, str): if channel.startswith('L'): - channel = int(channel[1:]) + channel = int(channel[1:]) # noqa else: raise ValueError("Unrecognised trigger value") if channel < 0 or channel > 7: @@ -268,8 +277,7 @@ class Scope(vm.VirtualMachine): if trigger_timeout > vm.Registers.Timeout.maximum_value: if timeout > 0: raise ConfigurationError("Required trigger timeout too long") - else: - raise ConfigurationError("Required trigger timeout too long, use a later trigger position") + raise ConfigurationError("Required trigger timeout too long, use a later trigger position") Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture " f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})") @@ -372,7 +380,7 @@ class Scope(vm.VirtualMachine): Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}") break error = abs(frequency - actualf) / frequency - if error < max_error and (best_solution is None or error < best_solution[0]): + if error < max_error and (best_solution is None or error < best_solution[0]): # noqa best_solution = error, size, nwaves, clock, actualf else: if best_solution is None: @@ -495,11 +503,11 @@ class Scope(vm.VirtualMachine): for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False): for hi in np.linspace(self.analog_hi_max, 0.5, n): zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0) - if zero > 0.01 and full < 0.99 and full > zero: + if 0.01 < zero < full < 0.99: analog_range = self.clock_voltage / (full - zero) items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range)) await self.stop_clock() - lo, hi, low, high, offset = np.array(items).T + lo, hi, low, high, offset = np.array(items).T # noqa def f(params): dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None)) @@ -514,7 +522,7 @@ class Scope(vm.VirtualMachine): Log.info(f"Calibration succeeded: {result.message}") params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None) - def f(x): + def f(x): # noqa lo, hi = self.calculate_lo_hi(x[0], x[1], params) return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2) @@ -536,23 +544,21 @@ class Scope(vm.VirtualMachine): return f"" -""" -$ ipython3 --pylab -Using matplotlib backend: MacOSX - -In [1]: run scope - -In [2]: start_waveform(2000, 'triangle') -Out[2]: 2000.0 - -In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3) - -In [4]: plot(traces.A.timestamps, traces.A.samples) -Out[4]: [] - -In [5]: plot(traces.B.timestamps, traces.B.samples) -Out[5]: [] -""" +# $ ipython3 --pylab +# Using matplotlib backend: MacOSX +# +# In [1]: run scope +# +# In [2]: start_waveform(2000, 'triangle') +# Out[2]: 2000.0 +# +# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3) +# +# In [4]: plot(traces.A.timestamps, traces.A.samples) +# Out[4]: [] +# +# In [5]: plot(traces.B.timestamps, traces.B.samples) +# Out[5]: [] async def main(): diff --git a/streams.py b/streams.py index 5f13b8f..d450a20 100644 --- a/streams.py +++ b/streams.py @@ -5,6 +5,8 @@ streams Package for asynchronous serial IO. """ +# pylama:ignore=W1203,R0916,W0703 + import asyncio import logging import sys @@ -20,14 +22,14 @@ Log = logging.getLogger(__name__) class SerialStream: @classmethod - def devices_matching(cls, vid=None, pid=None, serial=None): + def devices_matching(cls, vid=None, pid=None, serial_number=None): for port in comports(): - if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number): + if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial_number == port.serial_number): yield port.device @classmethod - def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs): - for device in cls.devices_matching(vid, pid, serial): + def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs): + for device in cls.devices_matching(vid, pid, serial_number): return SerialStream(device, **kwargs) raise RuntimeError("No matching serial device") @@ -59,15 +61,15 @@ class SerialStream: return if not self._output_buffer: try: - n = self._connection.write(data) + nbytes = self._connection.write(data) except serial.SerialTimeoutException: - n = 0 + nbytes = 0 except Exception: Log.exception("Error writing to stream") raise - if n: - Log.debug(f"Write {data[:n]!r}") - self._output_buffer = data[n:] + if nbytes: + Log.debug(f"Write {data[:nbytes]!r}") + self._output_buffer = data[nbytes:] else: self._output_buffer += data if self._output_buffer and self._output_buffer_empty is None: @@ -80,16 +82,16 @@ class SerialStream: def _feed_data(self): try: - n = self._connection.write(self._output_buffer) + nbytes = self._connection.write(self._output_buffer) except serial.SerialTimeoutException: - n = 0 - except Exception as e: + nbytes = 0 + except Exception as exc: Log.exception("Error writing to stream") - self._output_buffer_empty.set_exception(e) + self._output_buffer_empty.set_exception(exc) self._loop.remove_writer(self._connection) - if n: - Log.debug(f"Write {self._output_buffer[:n]!r}") - self._output_buffer = self._output_buffer[n:] + if nbytes: + Log.debug(f"Write {self._output_buffer[:nbytes]!r}") + self._output_buffer = self._output_buffer[nbytes:] if not self._output_buffer: self._loop.remove_writer(self._connection) self._output_buffer_empty.set_result(None) @@ -101,40 +103,39 @@ class SerialStream: data = bytes(self._output_buffer) self._output_buffer_lock.release() try: - n = self._connection.write(data) + nbytes = self._connection.write(data) finally: self._output_buffer_lock.acquire() - Log.debug(f"Write {self._output_buffer[:n]!r}") - self._output_buffer = self._output_buffer[n:] + Log.debug(f"Write {self._output_buffer[:nbytes]!r}") + self._output_buffer = self._output_buffer[nbytes:] self._output_buffer_empty = None - async def read(self, n=None): + async def read(self, nbytes=None): if self._use_threads: - return await self._loop.run_in_executor(None, self._read_blocking, n) + return await self._loop.run_in_executor(None, self._read_blocking, nbytes) while True: - w = self._connection.in_waiting - if w: - data = self._connection.read(w if n is None else min(n, w)) + nwaiting = self._connection.in_waiting + if nwaiting: + data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting)) Log.debug(f"Read {data!r}") return data - else: - future = self._loop.create_future() - self._loop.add_reader(self._connection, future.set_result, None) - try: - await future - finally: - self._loop.remove_reader(self._connection) + future = self._loop.create_future() + self._loop.add_reader(self._connection, future.set_result, None) + try: + await future + finally: + self._loop.remove_reader(self._connection) - def _read_blocking(self, n=None): + def _read_blocking(self, nbytes=None): data = self._connection.read(1) - w = self._connection.in_waiting - if w and (n is None or n > 1): - data += self._connection.read(w if n is None else min(n-1, w)) + nwaiting = self._connection.in_waiting + if nwaiting and (nbytes is None or nbytes > 1): + data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting)) Log.debug(f"Read {data!r}") return data - async def readexactly(self, n): + async def readexactly(self, nbytes): data = b'' - while len(data) < n: - data += await self.read(n-len(data)) + while len(data) < nbytes: + data += await self.read(nbytes-len(data)) return data diff --git a/utils.py b/utils.py index cfd6e97..948717b 100644 --- a/utils.py +++ b/utils.py @@ -1,3 +1,9 @@ +""" +utils +===== + +Random utility classes/functions. +""" class DotDict(dict): diff --git a/vm.py b/vm.py index d716a72..a2688ce 100644 --- a/vm.py +++ b/vm.py @@ -1,4 +1,3 @@ - """ vm == @@ -13,6 +12,8 @@ document][VM01B]. """ +# pylama:ignore=E221,C0326,R0904,W1203 + import array from collections import namedtuple from enum import IntEnum @@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])): else: width = int(self.dtype[1:]) if sign == 'U': - n = 1 << width - value = max(0, min(value, n-1)) - bs = struct.pack('= 0: reply = data[:index] @@ -333,25 +331,25 @@ class VirtualMachine: async def set_registers(self, **kwargs): cmd = '' - r0 = r1 = None + register0 = register1 = None for base, name in sorted((Registers[name].base, name) for name in kwargs): register = Registers[name] - bs = register.encode(kwargs[name]) - Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}") - for i, byte in enumerate(bs): + data = register.encode(kwargs[name]) + Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}") + for i, byte in enumerate(data): if cmd: cmd += 'z' - r1 += 1 + register1 += 1 address = base + i - if r1 is None or address > r1 + 3: + if register1 is None or address > register1 + 3: cmd += f'{address:02x}@' - r0 = r1 = address + register0 = register1 = address else: - cmd += 'n' * (address - r1) - r1 = address - if byte != r0: + cmd += 'n' * (address - register1) + register1 = address + if byte != register0: cmd += '[' if byte == 0 else f'{byte:02x}' - r0 = byte + register0 = byte if cmd: await self.issue(cmd + 's') @@ -384,22 +382,21 @@ class VirtualMachine: async def issue_triggered_trace(self): await self.issue(b'D') - async def read_analog_samples(self, n, sample_width): + async def read_analog_samples(self, nsamples, sample_width): if self._transactions: raise TypeError("Command transaction in progress") if sample_width == 2: - data = await self._reader.readexactly(2*n) + data = await self._reader.readexactly(2 * nsamples) return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data))) - elif sample_width == 1: - data = await self._reader.readexactly(n) + if sample_width == 1: + data = await self._reader.readexactly(nsamples) return array.array('f', (value/256 for value in data)) - else: - raise ValueError(f"Bad sample width: {sample_width}") + raise ValueError(f"Bad sample width: {sample_width}") - async def read_logic_samples(self, n): + async def read_logic_samples(self, nsamples): if self._transactions: raise TypeError("Command transaction in progress") - return await self._reader.readexactly(n) + return await self._reader.readexactly(nsamples) async def issue_cancel_trace(self): await self.issue(b'K') @@ -413,15 +410,15 @@ class VirtualMachine: async def issue_wavetable_read(self): await self.issue(b'R') - async def wavetable_read_bytes(self, n): + async def wavetable_read_bytes(self, nbytes): if self._transactions: raise TypeError("Command transaction in progress") - return await self._reader.readexactly(n) + return await self._reader.readexactly(nbytes) - async def wavetable_write_bytes(self, bs): + async def wavetable_write_bytes(self, data): cmd = '' last_byte = None - for byte in bs: + for byte in data: if byte != last_byte: cmd += f'{byte:02x}' cmd += 'W'