mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-13 18:52:10 +01:00
Oops! Missed one. More linter cleaning-up
This commit is contained in:
52
vm.py
52
vm.py
@ -14,7 +14,6 @@ document][VM01B].
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import array
|
import array
|
||||||
import asyncio
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
import logging
|
import logging
|
||||||
@ -23,7 +22,7 @@ import struct
|
|||||||
from utils import DotDict
|
from utils import DotDict
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
Log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
||||||
@ -46,6 +45,7 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
|||||||
else:
|
else:
|
||||||
raise TypeError("Unrecognised dtype")
|
raise TypeError("Unrecognised dtype")
|
||||||
return bs[:width//8]
|
return bs[:width//8]
|
||||||
|
|
||||||
def decode(self, bs):
|
def decode(self, bs):
|
||||||
if len(bs) < 4:
|
if len(bs) < 4:
|
||||||
bs = bs + bytes(4 - len(bs))
|
bs = bs + bytes(4 - len(bs))
|
||||||
@ -60,6 +60,7 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
|||||||
whole, fraction = map(int, self.dtype[1:].split('.', 1))
|
whole, fraction = map(int, self.dtype[1:].split('.', 1))
|
||||||
value = value / (1 << fraction)
|
value = value / (1 << fraction)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def maximum_value(self):
|
def maximum_value(self):
|
||||||
if '.' in self.dtype:
|
if '.' in self.dtype:
|
||||||
@ -68,8 +69,9 @@ class Register(namedtuple('Register', ['base', 'dtype', 'description'])):
|
|||||||
whole, fraction = int(self.dtype[1:]), 0
|
whole, fraction = int(self.dtype[1:]), 0
|
||||||
if self.dtype[0] == 'S':
|
if self.dtype[0] == 'S':
|
||||||
whole -= 1
|
whole -= 1
|
||||||
n = (1<<(whole+fraction)) - 1
|
n = (1 << (whole+fraction)) - 1
|
||||||
return n / (1<<fraction) if fraction else n
|
return n / (1 << fraction) if fraction else n
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def width(self):
|
def width(self):
|
||||||
if '.' in self.dtype:
|
if '.' in self.dtype:
|
||||||
@ -153,15 +155,18 @@ Registers = DotDict({
|
|||||||
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
|
"MasterClockM": Register(0xf8, 'U16', "PLL multiplier (MUL M)"),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
# pylama:ignore=E221
|
||||||
|
|
||||||
class TraceMode(IntEnum):
|
class TraceMode(IntEnum):
|
||||||
Analog = 0
|
Analog = 0
|
||||||
Mixed = 1
|
Mixed = 1
|
||||||
AnalogChop = 2
|
AnalogChop = 2
|
||||||
MixedChop = 3
|
MixedChop = 3
|
||||||
AnalogFast = 4
|
AnalogFast = 4
|
||||||
MixedFast = 5
|
MixedFast = 5
|
||||||
AnalogFastChop = 6
|
AnalogFastChop = 6
|
||||||
MixedFastChop = 7
|
MixedFastChop = 7
|
||||||
AnalogShot = 11
|
AnalogShot = 11
|
||||||
MixedShot = 12
|
MixedShot = 12
|
||||||
LogicShot = 13
|
LogicShot = 13
|
||||||
@ -172,6 +177,7 @@ class TraceMode(IntEnum):
|
|||||||
Macro = 18
|
Macro = 18
|
||||||
MacroChop = 19
|
MacroChop = 19
|
||||||
|
|
||||||
|
|
||||||
class BufferMode(IntEnum):
|
class BufferMode(IntEnum):
|
||||||
Single = 0
|
Single = 0
|
||||||
Chop = 1
|
Chop = 1
|
||||||
@ -180,6 +186,7 @@ class BufferMode(IntEnum):
|
|||||||
Macro = 4
|
Macro = 4
|
||||||
MacroChop = 5
|
MacroChop = 5
|
||||||
|
|
||||||
|
|
||||||
class DumpMode(IntEnum):
|
class DumpMode(IntEnum):
|
||||||
Raw = 0
|
Raw = 0
|
||||||
Burst = 1
|
Burst = 1
|
||||||
@ -190,6 +197,7 @@ class DumpMode(IntEnum):
|
|||||||
Filter = 6
|
Filter = 6
|
||||||
Span = 7
|
Span = 7
|
||||||
|
|
||||||
|
|
||||||
class SpockOption(IntEnum):
|
class SpockOption(IntEnum):
|
||||||
TriggerInvert = 0x40
|
TriggerInvert = 0x40
|
||||||
TriggerSourceA = 0x04 * 0
|
TriggerSourceA = 0x04 * 0
|
||||||
@ -198,23 +206,28 @@ class SpockOption(IntEnum):
|
|||||||
TriggerTypeSampledAnalog = 0x01 * 0
|
TriggerTypeSampledAnalog = 0x01 * 0
|
||||||
TriggerTypeHardwareComparator = 0x01 * 1
|
TriggerTypeHardwareComparator = 0x01 * 1
|
||||||
|
|
||||||
|
|
||||||
class KitchenSinkA(IntEnum):
|
class KitchenSinkA(IntEnum):
|
||||||
ChannelAComparatorEnable = 0x80
|
ChannelAComparatorEnable = 0x80
|
||||||
ChannelBComparatorEnable = 0x40
|
ChannelBComparatorEnable = 0x40
|
||||||
|
|
||||||
|
|
||||||
class KitchenSinkB(IntEnum):
|
class KitchenSinkB(IntEnum):
|
||||||
AnalogFilterEnable = 0x80
|
AnalogFilterEnable = 0x80
|
||||||
WaveformGeneratorEnable = 0x40
|
WaveformGeneratorEnable = 0x40
|
||||||
|
|
||||||
|
|
||||||
class TraceStatus(IntEnum):
|
class TraceStatus(IntEnum):
|
||||||
Done = 0x00
|
Done = 0x00
|
||||||
Auto = 0x01
|
Auto = 0x01
|
||||||
Wait = 0x02
|
Wait = 0x02
|
||||||
Stop = 0x03
|
Stop = 0x03
|
||||||
|
|
||||||
|
|
||||||
CaptureMode = namedtuple('CaptureMode', ('trace_mode', 'clock_low', 'clock_high', 'clock_divide',
|
CaptureMode = namedtuple('CaptureMode', ('trace_mode', 'clock_low', 'clock_high', 'clock_divide',
|
||||||
'analog_channels', 'sample_width', 'logic_channels', 'buffer_mode'))
|
'analog_channels', 'sample_width', 'logic_channels', 'buffer_mode'))
|
||||||
|
|
||||||
|
|
||||||
CaptureModes = [
|
CaptureModes = [
|
||||||
CaptureMode(TraceMode.Macro, 40, 16384, 1, 1, 2, False, BufferMode.Macro),
|
CaptureMode(TraceMode.Macro, 40, 16384, 1, 1, 2, False, BufferMode.Macro),
|
||||||
CaptureMode(TraceMode.MacroChop, 40, 16384, 1, 2, 2, False, BufferMode.MacroChop),
|
CaptureMode(TraceMode.MacroChop, 40, 16384, 1, 2, 2, False, BufferMode.MacroChop),
|
||||||
@ -241,12 +254,15 @@ class VirtualMachine:
|
|||||||
class Transaction:
|
class Transaction:
|
||||||
def __init__(self, vm):
|
def __init__(self, vm):
|
||||||
self._vm = vm
|
self._vm = vm
|
||||||
|
|
||||||
def append(self, cmd):
|
def append(self, cmd):
|
||||||
self._data += cmd
|
self._data += cmd
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
self._data = b''
|
self._data = b''
|
||||||
self._vm._transactions.append(self)
|
self._vm._transactions.append(self)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||||
if self._vm._transactions.pop() != self:
|
if self._vm._transactions.pop() != self:
|
||||||
raise RuntimeError("Mis-ordered transactions")
|
raise RuntimeError("Mis-ordered transactions")
|
||||||
@ -275,7 +291,7 @@ class VirtualMachine:
|
|||||||
if isinstance(cmd, str):
|
if isinstance(cmd, str):
|
||||||
cmd = cmd.encode('ascii')
|
cmd = cmd.encode('ascii')
|
||||||
if not self._transactions:
|
if not self._transactions:
|
||||||
LOG.debug(f"Issue: {cmd!r}")
|
Log.debug(f"Issue: {cmd!r}")
|
||||||
self._writer.write(cmd)
|
self._writer.write(cmd)
|
||||||
await self._writer.drain()
|
await self._writer.drain()
|
||||||
echo = await self._reader.readexactly(len(cmd))
|
echo = await self._reader.readexactly(len(cmd))
|
||||||
@ -293,7 +309,7 @@ class VirtualMachine:
|
|||||||
index = data.find(b'\r')
|
index = data.find(b'\r')
|
||||||
if index >= 0:
|
if index >= 0:
|
||||||
reply = data[:index]
|
reply = data[:index]
|
||||||
LOG.debug(f"Read reply: {reply!r}")
|
Log.debug(f"Read reply: {reply!r}")
|
||||||
replies.append(reply)
|
replies.append(reply)
|
||||||
data = data[index+1:]
|
data = data[index+1:]
|
||||||
else:
|
else:
|
||||||
@ -305,13 +321,13 @@ class VirtualMachine:
|
|||||||
async def issue_reset(self):
|
async def issue_reset(self):
|
||||||
if self._transactions:
|
if self._transactions:
|
||||||
raise TypeError("Command transaction in progress")
|
raise TypeError("Command transaction in progress")
|
||||||
LOG.debug("Issue reset")
|
Log.debug("Issue reset")
|
||||||
self._writer.write(b'!')
|
self._writer.write(b'!')
|
||||||
await self._writer.drain()
|
await self._writer.drain()
|
||||||
while not (await self._reader.read(1000)).endswith(b'!'):
|
while not (await self._reader.read(1000)).endswith(b'!'):
|
||||||
pass
|
pass
|
||||||
self._reply_buffer = b''
|
self._reply_buffer = b''
|
||||||
LOG.debug("Reset complete")
|
Log.debug("Reset complete")
|
||||||
|
|
||||||
async def set_registers(self, **kwargs):
|
async def set_registers(self, **kwargs):
|
||||||
cmd = ''
|
cmd = ''
|
||||||
@ -319,7 +335,7 @@ class VirtualMachine:
|
|||||||
for base, name in sorted((Registers[name].base, name) for name in kwargs):
|
for base, name in sorted((Registers[name].base, name) for name in kwargs):
|
||||||
register = Registers[name]
|
register = Registers[name]
|
||||||
bs = register.encode(kwargs[name])
|
bs = register.encode(kwargs[name])
|
||||||
LOG.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
|
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
|
||||||
for i, byte in enumerate(bs):
|
for i, byte in enumerate(bs):
|
||||||
if cmd:
|
if cmd:
|
||||||
cmd += 'z'
|
cmd += 'z'
|
||||||
@ -425,5 +441,3 @@ class VirtualMachine:
|
|||||||
|
|
||||||
async def issue_write_eeprom(self):
|
async def issue_write_eeprom(self):
|
||||||
await self.issue(b'w')
|
await self.issue(b'w')
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user