diff --git a/streams.py b/streams.py index 17e95bd..c7822b2 100644 --- a/streams.py +++ b/streams.py @@ -16,7 +16,7 @@ class SerialStream: self._loop = loop if loop is not None else asyncio.get_event_loop() self._input_buffer = bytes() self._output_buffer = bytes() - self._output_wait = None + self._output_buffer_empty = None def __repr__(self): return '<{}:{}>'.format(self.__class__.__name__, self._device) @@ -26,23 +26,54 @@ class SerialStream: self._connection = None def write(self, data): - self._output_buffer += data - if self._output_wait is None: - self._output_wait = asyncio.Future() + if not self._output_buffer: + try: + n = self._connection.write(data) + except serial.SerialTimeoutException: + n = 0 + except Exception as e: + Log.exception("Error writing to stream") + raise + if n: + Log.debug('Write %r', data[:n]) + self._output_buffer = data[n:] + else: + self._output_buffer += data + if self._output_buffer and self._output_buffer_empty is None: + self._output_buffer_empty = self._loop.create_future() self._loop.add_writer(self._connection, self._feed_data) - + + async def send_break(self): + baudrate = self._connection.baudrate + await self.drain() + self._connection.baudrate = 600 + self.write(b'\0') + await self.drain() + self._connection.baudrate = baudrate + async def drain(self): - if self._output_wait is not None: - await self._output_wait + if self._output_buffer_empty is not None: + await self._output_buffer_empty + n = self._connection.out_waiting + if n: + await asyncio.sleep(n * 10 / self._connection.baudrate) def _feed_data(self): - n = self._connection.write(self._output_buffer) - Log.debug('Write {}'.format(repr(self._output_buffer[:n]))) - self._output_buffer = self._output_buffer[n:] + try: + n = self._connection.write(self._output_buffer) + except serial.SerialTimeoutException: + n = 0 + except Exception as e: + Log.exception("Error writing to stream") + self._output_buffer_empty.set_exception(e) + self.remove_writer(self._connection, self._feed_data) + if n: + Log.debug('Write %r', self._output_buffer[:n]) + self._output_buffer = self._output_buffer[n:] if not self._output_buffer: self._loop.remove_writer(self._connection) - self._output_wait.set_result(None) - self._output_wait = None + self._output_buffer_empty.set_result(None) + self._output_buffer_empty = None async def read(self, n=None): while True: @@ -73,15 +104,19 @@ class SerialStream: return data self._input_buffer += await self._read() - def _read(self, n=None): - future = asyncio.Future() + async def _read(self, n=None): + future = self._loop.create_future() self._loop.add_reader(self._connection, self._handle_data, n, future) - return future + try: + data = await future + Log.debug('Read %r', data) + return data + finally: + self._loop.remove_reader(self._connection) def _handle_data(self, n, future): - data = self._connection.read(n if n is not None else self._connection.in_waiting) - Log.debug('Read {}'.format(repr(data))) - future.set_result(data) - self._loop.remove_reader(self._connection) + if not future.cancelled(): + data = self._connection.read(n if n is not None else self._connection.in_waiting) + future.set_result(data)