1
0
mirror of https://github.com/jonathanhogg/scopething synced 2025-07-14 11:12:09 +01:00

Compare commits

..

8 Commits

6 changed files with 178 additions and 156 deletions

Binary file not shown.

View File

@ -1,3 +1,11 @@
"""
analysis
========
Library code for analysing captures returned by `Scope.capture()`.
"""
# pylama:ignore=C0103,R1716
import numpy as np import numpy as np
@ -62,7 +70,7 @@ def extract_waveform(series, period):
p = int(round(series.sample_rate * period)) p = int(round(series.sample_rate * period))
n = len(series.samples) // p n = len(series.samples) // p
if n <= 2: if n <= 2:
return None, None return None, None, None, None
samples = np.array(series.samples)[:p*n] samples = np.array(series.samples)[:p*n]
cumsum = samples.cumsum() cumsum = samples.cumsum()
underlying = (cumsum[p:] - cumsum[:-p]) / p underlying = (cumsum[p:] - cumsum[:-p]) / p
@ -94,7 +102,7 @@ def normalize_waveform(samples, smooth=7):
crossings.append((i - last_rising, last_rising)) crossings.append((i - last_rising, last_rising))
if first_falling is not None: if first_falling is not None:
crossings.append((n + first_falling - last_rising, last_rising)) crossings.append((n + first_falling - last_rising, last_rising))
width, first = min(crossings) first = min(crossings)[1]
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings) return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
@ -104,7 +112,7 @@ def characterize_waveform(samples, crossings):
possibles = [] possibles = []
if len(crossings) == 1: if len(crossings) == 1:
duty_cycle = crossings[0][1] / n duty_cycle = crossings[0][1] / n
if duty_cycle > 0.45 and duty_cycle < 0.55: if 0.45 < duty_cycle < 0.55:
possibles.append((rms(samples - sine_wave(n)), 'sine', None)) possibles.append((rms(samples - sine_wave(n)), 'sine', None))
possibles.append((rms(samples - triangle_wave(n)), 'triangle', None)) possibles.append((rms(samples - triangle_wave(n)), 'triangle', None))
possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None)) possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None))

134
scope.py
View File

@ -1,4 +1,13 @@
#!/usr/bin/env python3 """
scope
=====
Code for talking to the BitScope series of USB digital mixed-signal scopes.
Only supports the BS000501 at the moment, but that's only because it's never
been tested on any other model.
"""
# pylama:ignore=E0611,E1101,W0201,W1203,W0631,C0103,R0902,R0912,R0913,R0914,R0915,C0415,W0601,W0102
import argparse import argparse
import array import array
@ -43,8 +52,8 @@ class Scope(vm.VirtualMachine):
break break
else: else:
raise RuntimeError("No matching serial device found") raise RuntimeError("No matching serial device found")
Log.info(f"Connecting to scope at {url}")
self.close() self.close()
Log.info(f"Connecting to scope at {url}")
parts = urlparse(url, scheme='file') parts = urlparse(url, scheme='file')
if parts.scheme == 'file': if parts.scheme == 'file':
self._reader = self._writer = streams.SerialStream(device=parts.path) self._reader = self._writer = streams.SerialStream(device=parts.path)
@ -63,8 +72,8 @@ class Scope(vm.VirtualMachine):
await self.issue_get_revision() await self.issue_get_revision()
revision = ((await self.read_replies(2))[1]).decode('ascii') revision = ((await self.read_replies(2))[1]).decode('ascii')
if revision == 'BS000501': if revision == 'BS000501':
self.master_clock_rate = 40000000 self.primary_clock_rate = 40000000
self.master_clock_period = 1/self.master_clock_rate self.primary_clock_period = 1/self.primary_clock_rate
self.capture_buffer_size = 12 << 10 self.capture_buffer_size = 12 << 10
self.awg_wavetable_size = 1024 self.awg_wavetable_size = 1024
self.awg_sample_buffer_size = 1024 self.awg_sample_buffer_size = 1024
@ -74,8 +83,8 @@ class Scope(vm.VirtualMachine):
self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)} self.analog_params = {'x1': self.AnalogParams(1.1, -.05, 0, 1.1, -.05, -.05, 18.333, -7.517, -5.5, 8, 0)}
self.analog_lo_min = 0.07 self.analog_lo_min = 0.07
self.analog_hi_max = 0.88 self.analog_hi_max = 0.88
self.timeout_clock_period = (1 << 8) * self.master_clock_period self.timeout_clock_period = (1 << 8) * self.primary_clock_period
self.timestamp_rollover = (1 << 32) * self.master_clock_period self.timestamp_rollover = (1 << 32) * self.primary_clock_period
else: else:
raise RuntimeError(f"Unsupported scope, revision: {revision}") raise RuntimeError(f"Unsupported scope, revision: {revision}")
self._awg_running = False self._awg_running = False
@ -115,8 +124,8 @@ class Scope(vm.VirtualMachine):
self.close() self.close()
def close(self): def close(self):
super().close() if super().close():
Log.info("Closed scope") Log.info("Closed scope")
def calculate_lo_hi(self, low, high, params): def calculate_lo_hi(self, low, high, params):
if not isinstance(params, self.AnalogParams): if not isinstance(params, self.AnalogParams):
@ -160,14 +169,16 @@ class Scope(vm.VirtualMachine):
logic_enable = sum(1 << channel for channel in logic_channels) logic_enable = sum(1 << channel for channel in logic_channels)
for capture_mode in vm.CaptureModes: for capture_mode in vm.CaptureModes:
ticks = int(round(period / self.master_clock_period / nsamples)) ticks = int(round(period / self.primary_clock_period / nsamples))
clock_scale = 1 clock_scale = 1
if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels): if capture_mode.analog_channels == len(analog_channels) and capture_mode.logic_channels == bool(logic_channels):
Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...") Log.debug(f"Considering trace mode {capture_mode.trace_mode.name}...")
if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1: if ticks > capture_mode.clock_high and capture_mode.clock_divide > 1:
clock_scale = int(math.ceil(period / self.master_clock_period / nsamples / capture_mode.clock_high)) clock_scale = min(capture_mode.clock_divide, int(math.ceil(period / self.primary_clock_period / nsamples / capture_mode.clock_high)))
ticks = int(round(period / self.master_clock_period / nsamples / clock_scale)) ticks = int(round(period / self.primary_clock_period / nsamples / clock_scale))
if ticks in range(capture_mode.clock_low, capture_mode.clock_high+1): if ticks > capture_mode.clock_low:
if ticks > capture_mode.clock_high:
ticks = capture_mode.clock_high
Log.debug(f"- try with tick count {ticks} x {clock_scale}") Log.debug(f"- try with tick count {ticks} x {clock_scale}")
else: else:
continue continue
@ -178,23 +189,23 @@ class Scope(vm.VirtualMachine):
else: else:
Log.debug("- mode too slow") Log.debug("- mode too slow")
continue continue
n = int(round(period / self.master_clock_period / ticks / clock_scale)) actual_nsamples = int(round(period / self.primary_clock_period / ticks / clock_scale))
if len(analog_channels) == 2: if len(analog_channels) == 2:
n -= n % 2 actual_nsamples -= actual_nsamples % 2
buffer_width = self.capture_buffer_size // capture_mode.sample_width buffer_width = self.capture_buffer_size // capture_mode.sample_width
if logic_channels and analog_channels: if logic_channels and analog_channels:
buffer_width //= 2 buffer_width //= 2
if n <= buffer_width: if actual_nsamples <= buffer_width:
Log.debug(f"- OK; period is {n} samples") Log.debug(f"- OK; period is {actual_nsamples} samples")
nsamples = n nsamples = actual_nsamples
break break
Log.debug(f"- insufficient buffer space for necessary {n} samples") Log.debug(f"- insufficient buffer space for necessary {actual_nsamples} samples")
else: else:
raise ConfigurationError("Unable to find appropriate capture mode") raise ConfigurationError("Unable to find appropriate capture mode")
sample_period = ticks*clock_scale*self.master_clock_period sample_period = ticks*clock_scale*self.primary_clock_period
sample_rate = 1/sample_period sample_rate = 1/sample_period
if trigger_position and sample_rate > 5e6: if trigger_position and sample_rate > 5e6:
Log.warn("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0") Log.warning("Pre-trigger capture not supported above 5M samples/s; forcing trigger_position=0")
trigger_position = 0 trigger_position = 0
if raw: if raw:
@ -225,21 +236,13 @@ class Scope(vm.VirtualMachine):
if trigger_level is None: if trigger_level is None:
trigger_level = (high + low) / 2 trigger_level = (high + low) / 2
analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level analog_trigger_level = (trigger_level - analog_params.offset) / analog_params.scale if not raw else trigger_level
if trigger == 'A' or trigger == 'B': if isinstance(trigger, dict):
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
elif isinstance(trigger, dict):
trigger_logic = 0 trigger_logic = 0
trigger_mask = 0xff trigger_mask = 0xff
for channel, value in trigger.items(): for channel, value in trigger.items():
if isinstance(channel, str): if isinstance(channel, str):
if channel.startswith('L'): if channel.startswith('L'):
channel = int(channel[1:]) channel = int(channel[1:]) # noqa
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
if channel < 0 or channel > 7: if channel < 0 or channel > 7:
@ -248,6 +251,14 @@ class Scope(vm.VirtualMachine):
trigger_mask &= ~mask trigger_mask &= ~mask
if value: if value:
trigger_logic |= mask trigger_logic |= mask
elif trigger in {'A', 'B'}:
if trigger == 'A':
spock_option |= vm.SpockOption.TriggerSourceA
trigger_logic = 0x80
elif trigger == 'B':
spock_option |= vm.SpockOption.TriggerSourceB
trigger_logic = 0x40
trigger_mask = 0xff ^ trigger_logic
else: else:
raise ValueError("Unrecognised trigger value") raise ValueError("Unrecognised trigger value")
trigger_type = trigger_type.lower() trigger_type = trigger_type.lower()
@ -263,13 +274,12 @@ class Scope(vm.VirtualMachine):
if timeout is None: if timeout is None:
trigger_timeout = 0 trigger_timeout = 0
else: else:
trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.master_clock_period trigger_timeout = int(math.ceil(((trigger_intro+trigger_outro+trace_outro+2)*ticks*clock_scale*self.primary_clock_period
+ timeout)/self.timeout_clock_period)) + timeout)/self.timeout_clock_period))
if trigger_timeout > vm.Registers.Timeout.maximum_value: if trigger_timeout > vm.Registers.Timeout.maximum_value:
if timeout > 0: if timeout > 0:
raise ConfigurationError("Required trigger timeout too long") raise ConfigurationError("Required trigger timeout too long")
else: raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
raise ConfigurationError("Required trigger timeout too long, use a later trigger position")
Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture " Log.info(f"Begin {('mixed' if logic_channels else 'analogue') if analog_channels else 'logic'} signal capture "
f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})") f"at {sample_rate:,.0f} samples per second (trace mode {capture_mode.trace_mode.name})")
@ -315,7 +325,7 @@ class Scope(vm.VirtualMachine):
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1)) value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
data = await self.read_analog_samples(asamples, capture_mode.sample_width) data = await self.read_analog_samples(asamples, capture_mode.sample_width)
series = DotDict({'channel': channel, series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period, 'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps, 'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)), 'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
'sample_period': sample_period*len(analog_channels), 'sample_period': sample_period*len(analog_channels),
@ -337,7 +347,7 @@ class Scope(vm.VirtualMachine):
mask = 1 << i mask = 1 << i
channel = f'L{i}' channel = f'L{i}'
series = DotDict({'channel': channel, series = DotDict({'channel': channel,
'capture_start': start_timestamp * self.master_clock_period, 'capture_start': start_timestamp * self.primary_clock_period,
'timestamps': timestamps, 'timestamps': timestamps,
'samples': array.array('B', (1 if value & mask else 0 for value in data)), 'samples': array.array('B', (1 if value & mask else 0 for value in data)),
'sample_period': sample_period, 'sample_period': sample_period,
@ -347,7 +357,7 @@ class Scope(vm.VirtualMachine):
series.trigger_timestamp = series.timestamps[trigger_samples] series.trigger_timestamp = series.timestamps[trigger_samples]
series.trigger_level = trigger[i] series.trigger_level = trigger[i]
series.trigger_type = trigger_type series.trigger_type = trigger_type
traces[channel] = series traces[channel] = series
Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}") Log.info(f"{nsamples} samples captured on {cause}, traces: {', '.join(traces)}")
return traces return traces
@ -360,19 +370,19 @@ class Scope(vm.VirtualMachine):
raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})") raise ValueError(f"high out of range (0-{self.awg_maximum_voltage})")
if low < 0 or low > high: if low < 0 or low > high:
raise ValueError("low out of range (0-high)") raise ValueError("low out of range (0-high)")
max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.master_clock_rate / frequency / min_samples))) max_clock = min(vm.Registers.Clock.maximum_value, int(math.floor(self.primary_clock_rate / frequency / min_samples)))
min_clock = max(self.awg_minimum_clock, int(math.ceil(self.master_clock_rate / frequency / self.awg_sample_buffer_size))) min_clock = max(self.awg_minimum_clock, int(math.ceil(self.primary_clock_rate / frequency / self.awg_sample_buffer_size)))
best_solution = None best_solution = None
for clock in range(min_clock, max_clock+1): for clock in range(min_clock, max_clock+1):
width = self.master_clock_rate / frequency / clock width = self.primary_clock_rate / frequency / clock
nwaves = int(self.awg_sample_buffer_size / width) nwaves = int(self.awg_sample_buffer_size / width)
size = int(round(nwaves * width)) size = int(round(nwaves * width))
actualf = self.master_clock_rate * nwaves / size / clock actualf = self.primary_clock_rate * nwaves / size / clock
if actualf == frequency: if actualf == frequency:
Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}") Log.debug(f"Exact solution: size={size} nwaves={nwaves} clock={clock}")
break break
error = abs(frequency - actualf) / frequency error = abs(frequency - actualf) / frequency
if error < max_error and (best_solution is None or error < best_solution[0]): if error < max_error and (best_solution is None or error < best_solution[0]): # noqa
best_solution = error, size, nwaves, clock, actualf best_solution = error, size, nwaves, clock, actualf
else: else:
if best_solution is None: if best_solution is None:
@ -422,9 +432,9 @@ class Scope(vm.VirtualMachine):
async def start_clock(self, frequency, ratio=0.5, max_error=1e-4): async def start_clock(self, frequency, ratio=0.5, max_error=1e-4):
if self._awg_running: if self._awg_running:
raise UsageError("Cannot start clock while waveform generator in use") raise UsageError("Cannot start clock while waveform generator in use")
ticks = min(max(2, int(round(self.master_clock_rate / frequency))), vm.Registers.Clock.maximum_value) ticks = min(max(2, int(round(self.primary_clock_rate / frequency))), vm.Registers.Clock.maximum_value)
fall = min(max(1, int(round(ticks * ratio))), ticks-1) fall = min(max(1, int(round(ticks * ratio))), ticks-1)
actualf, actualr = self.master_clock_rate / ticks, fall / ticks actualf, actualr = self.primary_clock_rate / ticks, fall / ticks
if abs(actualf - frequency) / frequency > max_error: if abs(actualf - frequency) / frequency > max_error:
raise ConfigurationError("No solution to required frequency and max_error") raise ConfigurationError("No solution to required frequency and max_error")
async with self.transaction(): async with self.transaction():
@ -495,11 +505,11 @@ class Scope(vm.VirtualMachine):
for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False): for lo in np.linspace(self.analog_lo_min, 0.5, n, endpoint=False):
for hi in np.linspace(self.analog_hi_max, 0.5, n): for hi in np.linspace(self.analog_hi_max, 0.5, n):
zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0) zero, full, offset = await measure(lo, hi, 2e-3 if len(items) % 4 < 2 else 1e-3, len(items) % 2 == 0)
if zero > 0.01 and full < 0.99 and full > zero: if 0.01 < zero < full < 0.99:
analog_range = self.clock_voltage / (full - zero) analog_range = self.clock_voltage / (full - zero)
items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range)) items.append((lo, hi, -zero*analog_range, (1-zero)*analog_range, offset*analog_range))
await self.stop_clock() await self.stop_clock()
lo, hi, low, high, offset = np.array(items).T lo, hi, low, high, offset = np.array(items).T # noqa
def f(params): def f(params):
dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None)) dl, dh = self.calculate_lo_hi(low, high, self.AnalogParams(*params, analog_scale, analog_offset, None, None, None))
@ -514,7 +524,7 @@ class Scope(vm.VirtualMachine):
Log.info(f"Calibration succeeded: {result.message}") Log.info(f"Calibration succeeded: {result.message}")
params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None) params = self.AnalogParams(*result.x, analog_scale, analog_offset, None, None, None)
def f(x): def f(x): # noqa
lo, hi = self.calculate_lo_hi(x[0], x[1], params) lo, hi = self.calculate_lo_hi(x[0], x[1], params)
return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2) return np.sqrt((self.analog_lo_min - lo)**2 + (self.analog_hi_max - hi)**2)
@ -536,23 +546,21 @@ class Scope(vm.VirtualMachine):
return f"<Scope {self.url}>" return f"<Scope {self.url}>"
""" # $ ipython3 --pylab
$ ipython3 --pylab # Using matplotlib backend: MacOSX
Using matplotlib backend: MacOSX #
# In [1]: run scope
In [1]: run scope #
# In [2]: start_waveform(2000, 'triangle')
In [2]: start_waveform(2000, 'triangle') # Out[2]: 2000.0
Out[2]: 2000.0 #
# In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3)
In [3]: traces = capture(['A','B'], period=1e-3, low=0, high=3.3) #
# In [4]: plot(traces.A.timestamps, traces.A.samples)
In [4]: plot(traces.A.timestamps, traces.A.samples) # Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>]
Out[4]: [<matplotlib.lines.Line2D at 0x10c782160>] #
# In [5]: plot(traces.B.timestamps, traces.B.samples)
In [5]: plot(traces.B.timestamps, traces.B.samples) # Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
Out[5]: [<matplotlib.lines.Line2D at 0x10e6ea320>]
"""
async def main(): async def main():

View File

@ -5,6 +5,8 @@ streams
Package for asynchronous serial IO. Package for asynchronous serial IO.
""" """
# pylama:ignore=W1203,R0916,W0703
import asyncio import asyncio
import logging import logging
import sys import sys
@ -20,14 +22,14 @@ Log = logging.getLogger(__name__)
class SerialStream: class SerialStream:
@classmethod @classmethod
def devices_matching(cls, vid=None, pid=None, serial=None): def devices_matching(cls, vid=None, pid=None, serial_number=None):
for port in comports(): for port in comports():
if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial is None or serial == port.serial_number): if (vid is None or vid == port.vid) and (pid is None or pid == port.pid) and (serial_number is None or serial_number == port.serial_number):
yield port.device yield port.device
@classmethod @classmethod
def stream_matching(cls, vid=None, pid=None, serial=None, **kwargs): def stream_matching(cls, vid=None, pid=None, serial_number=None, **kwargs):
for device in cls.devices_matching(vid, pid, serial): for device in cls.devices_matching(vid, pid, serial_number):
return SerialStream(device, **kwargs) return SerialStream(device, **kwargs)
raise RuntimeError("No matching serial device") raise RuntimeError("No matching serial device")
@ -59,15 +61,15 @@ class SerialStream:
return return
if not self._output_buffer: if not self._output_buffer:
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception: except Exception:
Log.exception("Error writing to stream") Log.exception("Error writing to stream")
raise raise
if n: if nbytes:
Log.debug(f"Write {data[:n]!r}") Log.debug(f"Write {data[:nbytes]!r}")
self._output_buffer = data[n:] self._output_buffer = data[nbytes:]
else: else:
self._output_buffer += data self._output_buffer += data
if self._output_buffer and self._output_buffer_empty is None: if self._output_buffer and self._output_buffer_empty is None:
@ -80,16 +82,16 @@ class SerialStream:
def _feed_data(self): def _feed_data(self):
try: try:
n = self._connection.write(self._output_buffer) nbytes = self._connection.write(self._output_buffer)
except serial.SerialTimeoutException: except serial.SerialTimeoutException:
n = 0 nbytes = 0
except Exception as e: except Exception as exc:
Log.exception("Error writing to stream") Log.exception("Error writing to stream")
self._output_buffer_empty.set_exception(e) self._output_buffer_empty.set_exception(exc)
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
if n: if nbytes:
Log.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
if not self._output_buffer: if not self._output_buffer:
self._loop.remove_writer(self._connection) self._loop.remove_writer(self._connection)
self._output_buffer_empty.set_result(None) self._output_buffer_empty.set_result(None)
@ -101,40 +103,39 @@ class SerialStream:
data = bytes(self._output_buffer) data = bytes(self._output_buffer)
self._output_buffer_lock.release() self._output_buffer_lock.release()
try: try:
n = self._connection.write(data) nbytes = self._connection.write(data)
finally: finally:
self._output_buffer_lock.acquire() self._output_buffer_lock.acquire()
Log.debug(f"Write {self._output_buffer[:n]!r}") Log.debug(f"Write {self._output_buffer[:nbytes]!r}")
self._output_buffer = self._output_buffer[n:] self._output_buffer = self._output_buffer[nbytes:]
self._output_buffer_empty = None self._output_buffer_empty = None
async def read(self, n=None): async def read(self, nbytes=None):
if self._use_threads: if self._use_threads:
return await self._loop.run_in_executor(None, self._read_blocking, n) return await self._loop.run_in_executor(None, self._read_blocking, nbytes)
while True: while True:
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w: if nwaiting:
data = self._connection.read(w if n is None else min(n, w)) data = self._connection.read(nwaiting if nbytes is None else min(nbytes, nwaiting))
Log.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
else: future = self._loop.create_future()
future = self._loop.create_future() self._loop.add_reader(self._connection, future.set_result, None)
self._loop.add_reader(self._connection, future.set_result, None) try:
try: await future
await future finally:
finally: self._loop.remove_reader(self._connection)
self._loop.remove_reader(self._connection)
def _read_blocking(self, n=None): def _read_blocking(self, nbytes=None):
data = self._connection.read(1) data = self._connection.read(1)
w = self._connection.in_waiting nwaiting = self._connection.in_waiting
if w and (n is None or n > 1): if nwaiting and (nbytes is None or nbytes > 1):
data += self._connection.read(w if n is None else min(n-1, w)) data += self._connection.read(nwaiting if nbytes is None else min(nbytes-1, nwaiting))
Log.debug(f"Read {data!r}") Log.debug(f"Read {data!r}")
return data return data
async def readexactly(self, n): async def readexactly(self, nbytes):
data = b'' data = b''
while len(data) < n: while len(data) < nbytes:
data += await self.read(n-len(data)) data += await self.read(nbytes-len(data))
return data return data

View File

@ -1,3 +1,9 @@
"""
utils
=====
Random utility classes/functions.
"""
class DotDict(dict): class DotDict(dict):

103
vm.py
View File

@ -1,4 +1,3 @@
""" """
vm vm
== ==
@ -7,12 +6,14 @@ Package capturing BitScope VM specification, including registers, enumerations,
commands and logic for encoding and decoding virtual machine instructions and data. commands and logic for encoding and decoding virtual machine instructions and data.
All names and descriptions copyright BitScope and taken from their [VM specification All names and descriptions copyright BitScope and taken from their [VM specification
document][VM01B]. document][VM01B] (with slight changes).
[VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY [VM01B]: https://docs.google.com/document/d/1cZNRpSPAMyIyAvIk_mqgEByaaHzbFTX8hWglAMTlnHY
""" """
# pylama:ignore=E221,C0326,R0904,W1203
import array import array
from collections import namedtuple from collections import namedtuple
from enum import IntEnum from enum import IntEnum
@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
else: else:
width = int(self.dtype[1:]) width = int(self.dtype[1:])
if sign == 'U': if sign == 'U':
n = 1 << width max_value = (1 << width) - 1
value = max(0, min(value, n-1)) value = min(max(0, value), max_value)
bs = struct.pack('<I', value) data = struct.pack('<I', value)
elif sign == 'S': elif sign == 'S':
n = 1 << (width - 1) max_value = (1 << (width - 1))
value = max(-n, min(value, n-1)) value = min(max(-max_value, value), max_value - 1)
bs = struct.pack('<i', value) data = struct.pack('<i', value)
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
return bs[:width//8] return data[:width//8]
def decode(self, bs): def decode(self, data):
if len(bs) < 4: if len(data) < 4:
bs = bs + bytes(4 - len(bs)) data = data + bytes(4 - len(data))
sign = self.dtype[0] sign = self.dtype[0]
if sign == 'U': if sign == 'U':
value = struct.unpack('<I', bs)[0] value = struct.unpack('<I', data)[0]
elif sign == 'S': elif sign == 'S':
value = struct.unpack('<i', bs)[0] value = struct.unpack('<i', data)[0]
else: else:
raise TypeError("Unrecognised dtype") raise TypeError("Unrecognised dtype")
if '.' in self.dtype: if '.' in self.dtype:
whole, fraction = map(int, self.dtype[1:].split('.', 1)) fraction = int(self.dtype.split('.', 1)[1])
value = value / (1 << fraction) value /= (1 << fraction)
return value return value
@property @property
@ -69,15 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
whole, fraction = int(self.dtype[1:]), 0 whole, fraction = int(self.dtype[1:]), 0
if self.dtype[0] == 'S': if self.dtype[0] == 'S':
whole -= 1 whole -= 1
n = (1 << (whole+fraction)) - 1 max_value = (1 << (whole+fraction)) - 1
return n / (1 << fraction) if fraction else n return max_value / (1 << fraction) if fraction else max_value
@property @property
def width(self): def width(self):
if '.' in self.dtype: if '.' in self.dtype:
return sum(map(int, self.dtype[1:].split('.', 1))) // 8 return sum(map(int, self.dtype[1:].split('.', 1))) // 8
else: return int(self.dtype[1:]) // 8
return int(self.dtype[1:]) // 8
Registers = DotDict({ Registers = DotDict({
@ -90,8 +90,8 @@ Registers = DotDict({
"TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"), "TriggerOutro": Register(0x34, 'U16', "Edge trigger outro filter counter (samples/2)"),
"TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"), "TriggerValue": Register(0x44, 'S0.16', "Digital (comparator) trigger (signed)"),
"TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"), "TriggerTime": Register(0x40, 'U32', "Stopwatch trigger time (ticks)"),
"ClockTicks": Register(0x2e, 'U16', "Master Sample (clock) period (ticks)"), "ClockTicks": Register(0x2e, 'U16', "Sample period (ticks)"),
"ClockScale": Register(0x14, 'U16', "Clock divide by N (low byte)"), "ClockScale": Register(0x14, 'U16', "Sample clock divider"),
"TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"), "TraceOption": Register(0x20, 'U8', "Trace Mode Option bits"),
"TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"), "TraceMode": Register(0x21, 'U8', "Trace Mode (see Trace Mode Table)"),
"TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"), "TraceIntro": Register(0x26, 'U16', "Pre-trigger capture count (samples)"),
@ -151,13 +151,11 @@ Registers = DotDict({
"Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"), "Map5": Register(0x99, 'U8', "Peripheral Pin Select Channel 5"),
"Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"), "Map6": Register(0x9a, 'U8', "Peripheral Pin Select Channel 6"),
"Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"), "Map7": Register(0x9b, 'U8', "Peripheral Pin Select Channel 7"),
"MasterClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"), "PrimaryClockN": Register(0xf7, 'U8', "PLL prescale (DIV N)"),
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"), "PrimaryClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
}) })
# pylama:ignore=E221
class TraceMode(IntEnum): class TraceMode(IntEnum):
Analog = 0 Analog = 0
Mixed = 1 Mixed = 1
@ -254,12 +252,12 @@ class VirtualMachine:
class Transaction: class Transaction:
def __init__(self, vm): def __init__(self, vm):
self._vm = vm self._vm = vm
self._data = b''
def append(self, cmd): def append(self, cmd):
self._data += cmd self._data += cmd
async def __aenter__(self): async def __aenter__(self):
self._data = b''
self._vm._transactions.append(self) self._vm._transactions.append(self)
return self return self
@ -281,6 +279,8 @@ class VirtualMachine:
self._writer.close() self._writer.close()
self._writer = None self._writer = None
self._reader = None self._reader = None
return True
return False
__del__ = close __del__ = close
@ -300,12 +300,12 @@ class VirtualMachine:
else: else:
self._transactions[-1].append(cmd) self._transactions[-1].append(cmd)
async def read_replies(self, n): async def read_replies(self, nreplies):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
replies = [] replies = []
data, self._reply_buffer = self._reply_buffer, b'' data, self._reply_buffer = self._reply_buffer, b''
while len(replies) < n: while len(replies) < nreplies:
index = data.find(b'\r') index = data.find(b'\r')
if index >= 0: if index >= 0:
reply = data[:index] reply = data[:index]
@ -331,25 +331,25 @@ class VirtualMachine:
async def set_registers(self, **kwargs): async def set_registers(self, **kwargs):
cmd = '' cmd = ''
r0 = r1 = None register0 = register1 = None
for base, name in sorted((Registers[name].base, name) for name in kwargs): for base, name in sorted((Registers[name].base, name) for name in kwargs):
register = Registers[name] register = Registers[name]
bs = register.encode(kwargs[name]) data = register.encode(kwargs[name])
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}") Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
for i, byte in enumerate(bs): for i, byte in enumerate(data):
if cmd: if cmd:
cmd += 'z' cmd += 'z'
r1 += 1 register1 += 1
address = base + i address = base + i
if r1 is None or address > r1 + 3: if register1 is None or address > register1 + 3:
cmd += f'{address:02x}@' cmd += f'{address:02x}@'
r0 = r1 = address register0 = register1 = address
else: else:
cmd += 'n' * (address - r1) cmd += 'n' * (address - register1)
r1 = address register1 = address
if byte != r0: if byte != register0:
cmd += '[' if byte == 0 else f'{byte:02x}' cmd += '[' if byte == 0 else f'{byte:02x}'
r0 = byte register0 = byte
if cmd: if cmd:
await self.issue(cmd + 's') await self.issue(cmd + 's')
@ -382,22 +382,21 @@ class VirtualMachine:
async def issue_triggered_trace(self): async def issue_triggered_trace(self):
await self.issue(b'D') await self.issue(b'D')
async def read_analog_samples(self, n, sample_width): async def read_analog_samples(self, nsamples, sample_width):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
if sample_width == 2: if sample_width == 2:
data = await self._reader.readexactly(2*n) data = await self._reader.readexactly(2 * nsamples)
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data))) return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
elif sample_width == 1: if sample_width == 1:
data = await self._reader.readexactly(n) data = await self._reader.readexactly(nsamples)
return array.array('f', (value/256 for value in data)) return array.array('f', (value/256 for value in data))
else: raise ValueError(f"Bad sample width: {sample_width}")
raise ValueError(f"Bad sample width: {sample_width}")
async def read_logic_samples(self, n): async def read_logic_samples(self, nsamples):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nsamples)
async def issue_cancel_trace(self): async def issue_cancel_trace(self):
await self.issue(b'K') await self.issue(b'K')
@ -411,15 +410,15 @@ class VirtualMachine:
async def issue_wavetable_read(self): async def issue_wavetable_read(self):
await self.issue(b'R') await self.issue(b'R')
async def wavetable_read_bytes(self, n): async def wavetable_read_bytes(self, nbytes):
if self._transactions: if self._transactions:
raise TypeError("Command transaction in progress") raise TypeError("Command transaction in progress")
return await self._reader.readexactly(n) return await self._reader.readexactly(nbytes)
async def wavetable_write_bytes(self, bs): async def wavetable_write_bytes(self, data):
cmd = '' cmd = ''
last_byte = None last_byte = None
for byte in bs: for byte in data:
if byte != last_byte: if byte != last_byte:
cmd += f'{byte:02x}' cmd += f'{byte:02x}'
cmd += 'W' cmd += 'W'