From 942c7e7dbce1dd476fd315788192315e5215d926 Mon Sep 17 00:00:00 2001 From: Jonathan Hogg Date: Fri, 7 Jul 2017 13:53:36 +0100 Subject: [PATCH] Use low latency serial IO on Linux; optimise reading of replies; add pushback buffer to serial stream for the latter --- streams.py | 37 +++++++++++++++++++++++++++++++++++-- vm.py | 25 ++++++++++++++----------- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/streams.py b/streams.py index a7ce047..49cb6f0 100644 --- a/streams.py +++ b/streams.py @@ -2,6 +2,7 @@ import asyncio import logging import os +import sys import serial from serial.tools.list_ports import comports @@ -9,6 +10,29 @@ from serial.tools.list_ports import comports Log = logging.getLogger('streams') +if sys.platform == 'linux': + + import fcntl + import struct + + TIOCGSERIAL = 0x541E + TIOCSSERIAL = 0x541F + ASYNC_LOW_LATENCY = 1<<13 + SERIAL_STRUCT_FORMAT = '2iI5iH2ci2HPHIL' + SERIAL_FLAGS_INDEX = 4 + + def set_low_latency(fd): + data = list(struct.unpack(SERIAL_STRUCT_FORMAT, fcntl.ioctl(fd, TIOCGSERIAL, b'\0'*struct.calcsize(SERIAL_STRUCT_FORMAT)))) + data[SERIAL_FLAGS_INDEX] |= ASYNC_LOW_LATENCY + fcntl.ioctl(fd, TIOCSSERIAL, struct.pack(SERIAL_STRUCT_FORMAT, *data)) + +else: + + def set_low_latency(fd): + pass + + + class SerialStream: @classmethod @@ -21,10 +45,12 @@ class SerialStream: def __init__(self, device, loop=None, **kwargs): self._device = device self._connection = serial.Serial(self._device, timeout=0, write_timeout=0, **kwargs) + set_low_latency(self._connection) Log.debug(f"Opened SerialStream on {device}") self._loop = loop if loop is not None else asyncio.get_event_loop() self._output_buffer = bytes() self._output_buffer_empty = None + self._pushback_buffer = bytes() def __repr__(self): return f'<{self.__class__.__name__}:{self._device}>' @@ -40,7 +66,7 @@ class SerialStream: n = self._connection.write(data) except serial.SerialTimeoutException: n = 0 - except Exception as e: + except: Log.exception("Error writing to stream") raise if n: @@ -72,8 +98,15 @@ class SerialStream: self._loop.remove_writer(self._connection) self._output_buffer_empty.set_result(None) self._output_buffer_empty = None - + + def pushback(self, data): + self._pushback_buffer += data + async def read(self, n=None): + if self._pushback_buffer: + data = self._pushback_buffer if n is None else self._pushback_buffer[:n] + self._pushback_buffer = self._pushback_buffer[len(data):] + return data while True: w = self._connection.in_waiting if w: diff --git a/vm.py b/vm.py index 0742525..67b66a7 100644 --- a/vm.py +++ b/vm.py @@ -238,20 +238,22 @@ class VirtualMachine: else: self._transactions[-1].append(cmd) - async def readuntil(self, separator): - data = b'' - while not data.endswith(separator): - data += await self._reader.read(1) - return data - async def read_replies(self, n): if self._transactions: raise TypeError("Command transaction in progress") replies = [] - for i in range(n): - reply = (await self.readuntil(b'\r'))[:-1] - Log.debug(f"Read reply: {reply!r}") - replies.append(reply) + data = b'' + while len(replies) < n: + index = data.find(b'\r') + if index >= 0: + reply = data[:index] + Log.debug(f"Read reply: {reply!r}") + replies.append(reply) + data = data[index+1:] + else: + data += await self._reader.read() + if data: + self._reader.pushback(data) return replies async def reset(self): @@ -260,7 +262,8 @@ class VirtualMachine: Log.debug("Issue reset") self._writer.write(b'!') await self._writer.drain() - await self.readuntil(b'!') + while not (await self._reader.read()).endswith(b'!'): + pass Log.debug("Reset complete") async def set_registers(self, **kwargs):