mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-14 03:02:09 +01:00
Use Python 3.6 format strings
This commit is contained in:
27
scope.py
27
scope.py
@ -32,10 +32,10 @@ class Scope(vm.VirtualMachine):
|
||||
reader = writer = streams.SerialStream(device=device)
|
||||
elif ':' in device:
|
||||
host, port = device.split(':', 1)
|
||||
Log.info("Connecting to remote scope at {}:{}".format(host, port))
|
||||
Log.info(f"Connecting to remote scope at {host}:{port}")
|
||||
reader, writer = await asyncio.open_connection(host, int(port))
|
||||
else:
|
||||
raise ValueError("Don't know what to do with '{}'".format(device))
|
||||
raise ValueError(f"Don't know what to do with '{device}'")
|
||||
scope = cls(reader, writer)
|
||||
await scope.setup()
|
||||
return scope
|
||||
@ -62,7 +62,7 @@ class Scope(vm.VirtualMachine):
|
||||
self.trigger_high = 10.816
|
||||
# await self.load_params() XXX switch this off until I understand EEPROM better
|
||||
self._generator_running = False
|
||||
Log.info("Initialised scope, revision: {}".format(revision))
|
||||
Log.info(f"Initialised scope, revision: {revision}")
|
||||
|
||||
def close(self):
|
||||
if self._writer is not None:
|
||||
@ -114,7 +114,7 @@ class Scope(vm.VirtualMachine):
|
||||
if clock_mode.dual == dual and ticks in range(clock_mode.clock_low, clock_mode.clock_high + 1):
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Unsupported clock period: {}".format(ticks))
|
||||
raise RuntimeError(f"Unsupported clock period: {ticks}")
|
||||
if clock_mode.clock_max is not None and ticks > clock_mode.clock_max:
|
||||
ticks = clock_mode.clock_max
|
||||
nsamples = int(round(period / ticks / nsamples_multiplier / self.capture_clock_period))
|
||||
@ -189,7 +189,7 @@ class Scope(vm.VirtualMachine):
|
||||
data = await self._reader.readexactly(nsamples * clock_mode.sample_width)
|
||||
value_multiplier, value_offset = (1, 0) if raw else ((high-low), low+self.analog_offsets[channel])
|
||||
if clock_mode.sample_width == 2:
|
||||
data = struct.unpack('>{}h'.format(nsamples), data)
|
||||
data = struct.unpack(f'>{nsamples}h', data)
|
||||
traces[channel] = [(value/65536+0.5)*value_multiplier + value_offset for value in data]
|
||||
else:
|
||||
traces[channel] = [(value/256)*value_multiplier + value_offset for value in data]
|
||||
@ -222,7 +222,7 @@ class Scope(vm.VirtualMachine):
|
||||
await self.issue_synthesize_wavetable()
|
||||
else:
|
||||
if len(wavetable) != self.awg_wavetable_size:
|
||||
raise ValueError("Wavetable data must be {} samples".format(self.awg_wavetable_size))
|
||||
raise ValueError(f"Wavetable data must be {self.awg_wavetable_size} samples")
|
||||
await self.set_registers(Cmd=0, Mode=1, Address=0, Size=1)
|
||||
await self.wavetable_write_bytes(wavetable)
|
||||
await self.set_registers(Cmd=0, Mode=0, Level=vpp / self.awg_maximum_voltage,
|
||||
@ -297,7 +297,7 @@ class Scope(vm.VirtualMachine):
|
||||
offset = items[:, 4].mean()
|
||||
self.analog_offsets = {'A': -offset, 'B': +offset}
|
||||
else:
|
||||
Log.warning("Calibration failed: {}".format(result.message))
|
||||
Log.warning(f"Calibration failed: {result.message}")
|
||||
print(result.message)
|
||||
return result.success
|
||||
|
||||
@ -313,7 +313,7 @@ INFO:scope:Initialised scope, revision: BS000501
|
||||
In [2]: generate(2000, 'triangle')
|
||||
Out[2]: 2000.0
|
||||
|
||||
In [3]: traces = capture('At', low=0, high=3.3)
|
||||
In [3]: traces = capture('tA', low=0, high=3.3)
|
||||
|
||||
In [4]: plot(traces.t, traces.A)
|
||||
Out[4]: [<matplotlib.lines.Line2D at 0x114009160>]
|
||||
@ -331,17 +331,20 @@ async def main():
|
||||
#y = np.round((np.sin(x)**5)*127 + 128, 0).astype('uint8')
|
||||
#await s.start_generator(1000, wavetable=y)
|
||||
|
||||
def await(g):
|
||||
return asyncio.get_event_loop().run_until_complete(g)
|
||||
|
||||
def capture(*args, **kwargs):
|
||||
return asyncio.get_event_loop().run_until_complete(s.capture(*args, **kwargs))
|
||||
return await(s.capture(*args, **kwargs))
|
||||
|
||||
def calibrate(*args, **kwargs):
|
||||
return asyncio.get_event_loop().run_until_complete(s.calibrate(*args, **kwargs))
|
||||
return await(s.calibrate(*args, **kwargs))
|
||||
|
||||
def generate(*args, **kwargs):
|
||||
return asyncio.get_event_loop().run_until_complete(s.start_generator(*args, **kwargs))
|
||||
return await(s.start_generator(*args, **kwargs))
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
|
||||
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
|
||||
asyncio.get_event_loop().run_until_complete(main())
|
||||
|
||||
|
15
streams.py
15
streams.py
@ -20,18 +20,19 @@ class SerialStream:
|
||||
def __init__(self, device, loop=None, **kwargs):
|
||||
self._device = device
|
||||
self._connection = serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
|
||||
Log.debug("Opened SerialStream on {}".format(device))
|
||||
Log.debug(f"Opened SerialStream on {device}")
|
||||
self._loop = loop if loop is not None else asyncio.get_event_loop()
|
||||
self._input_buffer = bytes()
|
||||
self._output_buffer = bytes()
|
||||
self._output_buffer_empty = None
|
||||
|
||||
def __repr__(self):
|
||||
return '<{}:{}>'.format(self.__class__.__name__, self._device)
|
||||
return f'<{self.__class__.__name__}:{self._device}>'
|
||||
|
||||
def close(self):
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
if self._connection is not None:
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
|
||||
def write(self, data):
|
||||
if not self._output_buffer:
|
||||
@ -43,7 +44,7 @@ class SerialStream:
|
||||
Log.exception("Error writing to stream")
|
||||
raise
|
||||
if n:
|
||||
Log.debug('Write %r', data[:n])
|
||||
Log.debug(f"Write {data[:n]!r}")
|
||||
self._output_buffer = data[n:]
|
||||
else:
|
||||
self._output_buffer += data
|
||||
@ -76,7 +77,7 @@ class SerialStream:
|
||||
self._output_buffer_empty.set_exception(e)
|
||||
self.remove_writer(self._connection, self._feed_data)
|
||||
if n:
|
||||
Log.debug('Write %r', self._output_buffer[:n])
|
||||
Log.debug(f"Write {self._output_buffer[:n]!r}")
|
||||
self._output_buffer = self._output_buffer[n:]
|
||||
if not self._output_buffer:
|
||||
self._loop.remove_writer(self._connection)
|
||||
@ -117,7 +118,7 @@ class SerialStream:
|
||||
self._loop.add_reader(self._connection, self._handle_data, n, future)
|
||||
try:
|
||||
data = await future
|
||||
Log.debug('Read %r', data)
|
||||
Log.debug(f"Read {data}")
|
||||
return data
|
||||
finally:
|
||||
self._loop.remove_reader(self._connection)
|
||||
|
14
vm.py
14
vm.py
@ -220,7 +220,7 @@ class VirtualMachine:
|
||||
if isinstance(cmd, str):
|
||||
cmd = cmd.encode('ascii')
|
||||
if not self._transactions:
|
||||
Log.debug("Issue: {}".format(repr(cmd)))
|
||||
Log.debug(f"Issue: {cmd!r}")
|
||||
self._writer.write(cmd)
|
||||
await self._writer.drain()
|
||||
echo = await self._reader.readexactly(len(cmd))
|
||||
@ -241,7 +241,7 @@ class VirtualMachine:
|
||||
replies = []
|
||||
for i in range(n):
|
||||
reply = (await self.readuntil(b'\r'))[:-1]
|
||||
Log.debug("Read reply: {}".format(repr(reply)))
|
||||
Log.debug(f"Read reply: {reply!r}")
|
||||
replies.append(reply)
|
||||
return replies
|
||||
|
||||
@ -260,27 +260,27 @@ class VirtualMachine:
|
||||
for base, name in sorted((Registers[name][0], name) for name in kwargs):
|
||||
base, dtype, desc = Registers[name]
|
||||
bs = encode(kwargs[name], dtype)
|
||||
Log.debug("{} = 0x{}".format(name, ''.join('{:02x}'.format(b) for b in reversed(bs))))
|
||||
Log.debug(f"{name} = 0x{''.join(f'{b:02x}' for b in reversed(bs))}")
|
||||
for i, byte in enumerate(bs):
|
||||
if cmd:
|
||||
cmd += 'z'
|
||||
r1 += 1
|
||||
address = base + i
|
||||
if r1 is None or address > r1 + 3:
|
||||
cmd += '{:02x}@'.format(address)
|
||||
cmd += f'{address:02x}@'
|
||||
r0 = r1 = address
|
||||
else:
|
||||
cmd += 'n' * (address - r1)
|
||||
r1 = address
|
||||
if byte != r0:
|
||||
cmd += '[' if byte == 0 else '{:02x}'.format(byte)
|
||||
cmd += '[' if byte == 0 else f'{byte:02x}'
|
||||
r0 = byte
|
||||
if cmd:
|
||||
await self.issue(cmd + 's')
|
||||
|
||||
async def get_register(self, name):
|
||||
base, dtype, desc = Registers[name]
|
||||
await self.issue('{:02x}@p'.format(base))
|
||||
await self.issue(f'{base:02x}@p')
|
||||
values = []
|
||||
width = calculate_width(dtype)
|
||||
for i in range(width):
|
||||
@ -324,7 +324,7 @@ class VirtualMachine:
|
||||
last_byte = None
|
||||
for byte in bs:
|
||||
if byte != last_byte:
|
||||
cmd += '{:02x}'.format(byte)
|
||||
cmd += f'{byte:02x}'
|
||||
cmd += 'W'
|
||||
last_byte = byte
|
||||
if cmd:
|
||||
|
Reference in New Issue
Block a user