mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-14 03:02:09 +01:00
Lots of linter changes.
This commit is contained in:
91
vm.py
91
vm.py
@ -1,4 +1,3 @@
|
||||
|
||||
"""
|
||||
vm
|
||||
==
|
||||
@ -13,6 +12,8 @@ document][VM01B].
|
||||
|
||||
"""
|
||||
|
||||
# pylama:ignore=E221,C0326,R0904,W1203
|
||||
|
||||
import array
|
||||
from collections import namedtuple
|
||||
from enum import IntEnum
|
||||
@ -35,30 +36,30 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
||||
else:
|
||||
width = int(self.dtype[1:])
|
||||
if sign == 'U':
|
||||
n = 1 << width
|
||||
value = max(0, min(value, n-1))
|
||||
bs = struct.pack('<I', value)
|
||||
max_value = (1 << width) - 1
|
||||
value = min(max(0, value), max_value)
|
||||
data = struct.pack('<I', value)
|
||||
elif sign == 'S':
|
||||
n = 1 << (width - 1)
|
||||
value = max(-n, min(value, n-1))
|
||||
bs = struct.pack('<i', value)
|
||||
max_value = (1 << (width - 1))
|
||||
value = min(max(-max_value, value), max_value - 1)
|
||||
data = struct.pack('<i', value)
|
||||
else:
|
||||
raise TypeError("Unrecognised dtype")
|
||||
return bs[:width//8]
|
||||
return data[:width//8]
|
||||
|
||||
def decode(self, bs):
|
||||
if len(bs) < 4:
|
||||
bs = bs + bytes(4 - len(bs))
|
||||
def decode(self, data):
|
||||
if len(data) < 4:
|
||||
data = data + bytes(4 - len(data))
|
||||
sign = self.dtype[0]
|
||||
if sign == 'U':
|
||||
value = struct.unpack('<I', bs)[0]
|
||||
value = struct.unpack('<I', data)[0]
|
||||
elif sign == 'S':
|
||||
value = struct.unpack('<i', bs)[0]
|
||||
value = struct.unpack('<i', data)[0]
|
||||
else:
|
||||
raise TypeError("Unrecognised dtype")
|
||||
if '.' in self.dtype:
|
||||
whole, fraction = map(int, self.dtype[1:].split('.', 1))
|
||||
value = value / (1 << fraction)
|
||||
fraction = int(self.dtype.split('.', 1)[1])
|
||||
value /= (1 << fraction)
|
||||
return value
|
||||
|
||||
@property
|
||||
@ -69,15 +70,14 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
||||
whole, fraction = int(self.dtype[1:]), 0
|
||||
if self.dtype[0] == 'S':
|
||||
whole -= 1
|
||||
n = (1 << (whole+fraction)) - 1
|
||||
return n / (1 << fraction) if fraction else n
|
||||
max_value = (1 << (whole+fraction)) - 1
|
||||
return max_value / (1 << fraction) if fraction else max_value
|
||||
|
||||
@property
|
||||
def width(self):
|
||||
if '.' in self.dtype:
|
||||
return sum(map(int, self.dtype[1:].split('.', 1))) // 8
|
||||
else:
|
||||
return int(self.dtype[1:]) // 8
|
||||
return int(self.dtype[1:]) // 8
|
||||
|
||||
|
||||
Registers = DotDict({
|
||||
@ -156,8 +156,6 @@ Registers = DotDict({
|
||||
})
|
||||
|
||||
|
||||
# pylama:ignore=E221
|
||||
|
||||
class TraceMode(IntEnum):
|
||||
Analog = 0
|
||||
Mixed = 1
|
||||
@ -254,12 +252,12 @@ class VirtualMachine:
|
||||
class Transaction:
|
||||
def __init__(self, vm):
|
||||
self._vm = vm
|
||||
self._data = b''
|
||||
|
||||
def append(self, cmd):
|
||||
self._data += cmd
|
||||
|
||||
async def __aenter__(self):
|
||||
self._data = b''
|
||||
self._vm._transactions.append(self)
|
||||
return self
|
||||
|
||||
@ -302,12 +300,12 @@ class VirtualMachine:
|
||||
else:
|
||||
self._transactions[-1].append(cmd)
|
||||
|
||||
async def read_replies(self, n):
|
||||
async def read_replies(self, nreplies):
|
||||
if self._transactions:
|
||||
raise TypeError("Command transaction in progress")
|
||||
replies = []
|
||||
data, self._reply_buffer = self._reply_buffer, b''
|
||||
while len(replies) < n:
|
||||
while len(replies) < nreplies:
|
||||
index = data.find(b'\r')
|
||||
if index >= 0:
|
||||
reply = data[:index]
|
||||
@ -333,25 +331,25 @@ class VirtualMachine:
|
||||
|
||||
async def set_registers(self, **kwargs):
|
||||
cmd = ''
|
||||
r0 = r1 = None
|
||||
register0 = register1 = None
|
||||
for base, name in sorted((Registers[name].base, name) for name in kwargs):
|
||||
register = Registers[name]
|
||||
bs = register.encode(kwargs[name])
|
||||
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
|
||||
for i, byte in enumerate(bs):
|
||||
data = register.encode(kwargs[name])
|
||||
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(data))}")
|
||||
for i, byte in enumerate(data):
|
||||
if cmd:
|
||||
cmd += 'z'
|
||||
r1 += 1
|
||||
register1 += 1
|
||||
address = base + i
|
||||
if r1 is None or address > r1 + 3:
|
||||
if register1 is None or address > register1 + 3:
|
||||
cmd += f'{address:02x}@'
|
||||
r0 = r1 = address
|
||||
register0 = register1 = address
|
||||
else:
|
||||
cmd += 'n' * (address - r1)
|
||||
r1 = address
|
||||
if byte != r0:
|
||||
cmd += 'n' * (address - register1)
|
||||
register1 = address
|
||||
if byte != register0:
|
||||
cmd += '[' if byte == 0 else f'{byte:02x}'
|
||||
r0 = byte
|
||||
register0 = byte
|
||||
if cmd:
|
||||
await self.issue(cmd + 's')
|
||||
|
||||
@ -384,22 +382,21 @@ class VirtualMachine:
|
||||
async def issue_triggered_trace(self):
|
||||
await self.issue(b'D')
|
||||
|
||||
async def read_analog_samples(self, n, sample_width):
|
||||
async def read_analog_samples(self, nsamples, sample_width):
|
||||
if self._transactions:
|
||||
raise TypeError("Command transaction in progress")
|
||||
if sample_width == 2:
|
||||
data = await self._reader.readexactly(2*n)
|
||||
data = await self._reader.readexactly(2 * nsamples)
|
||||
return array.array('f', ((value+32768)/65536 for (value,) in struct.iter_unpack('>h', data)))
|
||||
elif sample_width == 1:
|
||||
data = await self._reader.readexactly(n)
|
||||
if sample_width == 1:
|
||||
data = await self._reader.readexactly(nsamples)
|
||||
return array.array('f', (value/256 for value in data))
|
||||
else:
|
||||
raise ValueError(f"Bad sample width: {sample_width}")
|
||||
raise ValueError(f"Bad sample width: {sample_width}")
|
||||
|
||||
async def read_logic_samples(self, n):
|
||||
async def read_logic_samples(self, nsamples):
|
||||
if self._transactions:
|
||||
raise TypeError("Command transaction in progress")
|
||||
return await self._reader.readexactly(n)
|
||||
return await self._reader.readexactly(nsamples)
|
||||
|
||||
async def issue_cancel_trace(self):
|
||||
await self.issue(b'K')
|
||||
@ -413,15 +410,15 @@ class VirtualMachine:
|
||||
async def issue_wavetable_read(self):
|
||||
await self.issue(b'R')
|
||||
|
||||
async def wavetable_read_bytes(self, n):
|
||||
async def wavetable_read_bytes(self, nbytes):
|
||||
if self._transactions:
|
||||
raise TypeError("Command transaction in progress")
|
||||
return await self._reader.readexactly(n)
|
||||
return await self._reader.readexactly(nbytes)
|
||||
|
||||
async def wavetable_write_bytes(self, bs):
|
||||
async def wavetable_write_bytes(self, data):
|
||||
cmd = ''
|
||||
last_byte = None
|
||||
for byte in bs:
|
||||
for byte in data:
|
||||
if byte != last_byte:
|
||||
cmd += f'{byte:02x}'
|
||||
cmd += 'W'
|
||||
|
Reference in New Issue
Block a user