mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-14 03:02:09 +01:00
Use low latency serial IO on Linux; optimise reading of replies; add pushback buffer to serial stream for the latter
This commit is contained in:
37
streams.py
37
streams.py
@ -2,6 +2,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import serial
|
import serial
|
||||||
from serial.tools.list_ports import comports
|
from serial.tools.list_ports import comports
|
||||||
|
|
||||||
@ -9,6 +10,29 @@ from serial.tools.list_ports import comports
|
|||||||
Log = logging.getLogger('streams')
|
Log = logging.getLogger('streams')
|
||||||
|
|
||||||
|
|
||||||
|
if sys.platform == 'linux':
|
||||||
|
|
||||||
|
import fcntl
|
||||||
|
import struct
|
||||||
|
|
||||||
|
TIOCGSERIAL = 0x541E
|
||||||
|
TIOCSSERIAL = 0x541F
|
||||||
|
ASYNC_LOW_LATENCY = 1<<13
|
||||||
|
SERIAL_STRUCT_FORMAT = '2iI5iH2ci2HPHIL'
|
||||||
|
SERIAL_FLAGS_INDEX = 4
|
||||||
|
|
||||||
|
def set_low_latency(fd):
|
||||||
|
data = list(struct.unpack(SERIAL_STRUCT_FORMAT, fcntl.ioctl(fd, TIOCGSERIAL, b'\0'*struct.calcsize(SERIAL_STRUCT_FORMAT))))
|
||||||
|
data[SERIAL_FLAGS_INDEX] |= ASYNC_LOW_LATENCY
|
||||||
|
fcntl.ioctl(fd, TIOCSSERIAL, struct.pack(SERIAL_STRUCT_FORMAT, *data))
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
def set_low_latency(fd):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class SerialStream:
|
class SerialStream:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -21,10 +45,12 @@ class SerialStream:
|
|||||||
def __init__(self, device, loop=None, **kwargs):
|
def __init__(self, device, loop=None, **kwargs):
|
||||||
self._device = device
|
self._device = device
|
||||||
self._connection = serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
|
self._connection = serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs)
|
||||||
|
set_low_latency(self._connection)
|
||||||
Log.debug(f"Opened SerialStream on {device}")
|
Log.debug(f"Opened SerialStream on {device}")
|
||||||
self._loop = loop if loop is not None else asyncio.get_event_loop()
|
self._loop = loop if loop is not None else asyncio.get_event_loop()
|
||||||
self._output_buffer = bytes()
|
self._output_buffer = bytes()
|
||||||
self._output_buffer_empty = None
|
self._output_buffer_empty = None
|
||||||
|
self._pushback_buffer = bytes()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f'<{self.__class__.__name__}:{self._device}>'
|
return f'<{self.__class__.__name__}:{self._device}>'
|
||||||
@ -40,7 +66,7 @@ class SerialStream:
|
|||||||
n = self._connection.write(data)
|
n = self._connection.write(data)
|
||||||
except serial.SerialTimeoutException:
|
except serial.SerialTimeoutException:
|
||||||
n = 0
|
n = 0
|
||||||
except Exception as e:
|
except:
|
||||||
Log.exception("Error writing to stream")
|
Log.exception("Error writing to stream")
|
||||||
raise
|
raise
|
||||||
if n:
|
if n:
|
||||||
@ -72,8 +98,15 @@ class SerialStream:
|
|||||||
self._loop.remove_writer(self._connection)
|
self._loop.remove_writer(self._connection)
|
||||||
self._output_buffer_empty.set_result(None)
|
self._output_buffer_empty.set_result(None)
|
||||||
self._output_buffer_empty = None
|
self._output_buffer_empty = None
|
||||||
|
|
||||||
|
def pushback(self, data):
|
||||||
|
self._pushback_buffer += data
|
||||||
|
|
||||||
async def read(self, n=None):
|
async def read(self, n=None):
|
||||||
|
if self._pushback_buffer:
|
||||||
|
data = self._pushback_buffer if n is None else self._pushback_buffer[:n]
|
||||||
|
self._pushback_buffer = self._pushback_buffer[len(data):]
|
||||||
|
return data
|
||||||
while True:
|
while True:
|
||||||
w = self._connection.in_waiting
|
w = self._connection.in_waiting
|
||||||
if w:
|
if w:
|
||||||
|
25
vm.py
25
vm.py
@ -238,20 +238,22 @@ class VirtualMachine:
|
|||||||
else:
|
else:
|
||||||
self._transactions[-1].append(cmd)
|
self._transactions[-1].append(cmd)
|
||||||
|
|
||||||
async def readuntil(self, separator):
|
|
||||||
data = b''
|
|
||||||
while not data.endswith(separator):
|
|
||||||
data += await self._reader.read(1)
|
|
||||||
return data
|
|
||||||
|
|
||||||
async def read_replies(self, n):
|
async def read_replies(self, n):
|
||||||
if self._transactions:
|
if self._transactions:
|
||||||
raise TypeError("Command transaction in progress")
|
raise TypeError("Command transaction in progress")
|
||||||
replies = []
|
replies = []
|
||||||
for i in range(n):
|
data = b''
|
||||||
reply = (await self.readuntil(b'\r'))[:-1]
|
while len(replies) < n:
|
||||||
Log.debug(f"Read reply: {reply!r}")
|
index = data.find(b'\r')
|
||||||
replies.append(reply)
|
if index >= 0:
|
||||||
|
reply = data[:index]
|
||||||
|
Log.debug(f"Read reply: {reply!r}")
|
||||||
|
replies.append(reply)
|
||||||
|
data = data[index+1:]
|
||||||
|
else:
|
||||||
|
data += await self._reader.read()
|
||||||
|
if data:
|
||||||
|
self._reader.pushback(data)
|
||||||
return replies
|
return replies
|
||||||
|
|
||||||
async def reset(self):
|
async def reset(self):
|
||||||
@ -260,7 +262,8 @@ class VirtualMachine:
|
|||||||
Log.debug("Issue reset")
|
Log.debug("Issue reset")
|
||||||
self._writer.write(b'!')
|
self._writer.write(b'!')
|
||||||
await self._writer.drain()
|
await self._writer.drain()
|
||||||
await self.readuntil(b'!')
|
while not (await self._reader.read()).endswith(b'!'):
|
||||||
|
pass
|
||||||
Log.debug("Reset complete")
|
Log.debug("Reset complete")
|
||||||
|
|
||||||
async def set_registers(self, **kwargs):
|
async def set_registers(self, **kwargs):
|
||||||
|
Reference in New Issue
Block a user