summaryrefslogtreecommitdiffstats
path: root/scripts/serial/urlhandler
diff options
context:
space:
mode:
authorJan Luebbe <jlu@pengutronix.de>2015-06-08 12:09:34 +0200
committerSascha Hauer <s.hauer@pengutronix.de>2016-01-18 09:25:50 +0100
commit80caf8ac4300c36f21479f38699ac665082d9b39 (patch)
treea31d9e5ddb6fbff66af56768bcdaba7721a833e2 /scripts/serial/urlhandler
parentec95d547e721d2762c3cb66572528028f2a56483 (diff)
downloadbarebox-80caf8ac4300c36f21479f38699ac665082d9b39.tar.gz
barebox-80caf8ac4300c36f21479f38699ac665082d9b39.tar.xz
include pyserial trunk
The current pyserial is broken, this version contains the fix for: http://sourceforge.net/p/pyserial/bugs/166/ Signed-off-by: Jan Luebbe <jlu@pengutronix.de>
Diffstat (limited to 'scripts/serial/urlhandler')
-rw-r--r--scripts/serial/urlhandler/__init__.py0
-rw-r--r--scripts/serial/urlhandler/protocol_hwgrep.py45
-rw-r--r--scripts/serial/urlhandler/protocol_loop.py279
-rw-r--r--scripts/serial/urlhandler/protocol_rfc2217.py11
-rw-r--r--scripts/serial/urlhandler/protocol_socket.py291
5 files changed, 626 insertions, 0 deletions
diff --git a/scripts/serial/urlhandler/__init__.py b/scripts/serial/urlhandler/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/scripts/serial/urlhandler/__init__.py
diff --git a/scripts/serial/urlhandler/protocol_hwgrep.py b/scripts/serial/urlhandler/protocol_hwgrep.py
new file mode 100644
index 0000000000..62cda43aa7
--- /dev/null
+++ b/scripts/serial/urlhandler/protocol_hwgrep.py
@@ -0,0 +1,45 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# This module implements a special URL handler that uses the port listing to
+# find ports by searching the string descriptions.
+#
+# (C) 2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# URL format: hwgrep://regexp
+
+import serial
+import serial.tools.list_ports
+
+class Serial(serial.Serial):
+ """Just inherit the native Serial port implementation and patch the open function."""
+
+ def setPort(self, value):
+ """translate port name before storing it"""
+ if isinstance(value, basestring) and value.startswith('hwgrep://'):
+ serial.Serial.setPort(self, self.fromURL(value))
+ else:
+ serial.Serial.setPort(self, value)
+
+ def fromURL(self, url):
+ """extract host and port from an URL string"""
+ if url.lower().startswith("hwgrep://"): url = url[9:]
+ # use a for loop to get the 1st element from the generator
+ for port, desc, hwid in serial.tools.list_ports.grep(url):
+ return port
+ else:
+ raise serial.SerialException('no ports found matching regexp %r' % (url,))
+
+ # override property
+ port = property(serial.Serial.getPort, setPort, doc="Port setting")
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+if __name__ == '__main__':
+ #~ s = Serial('hwgrep://ttyS0')
+ s = Serial(None)
+ s.port = 'hwgrep://ttyS0'
+ print s
+
diff --git a/scripts/serial/urlhandler/protocol_loop.py b/scripts/serial/urlhandler/protocol_loop.py
new file mode 100644
index 0000000000..a414839761
--- /dev/null
+++ b/scripts/serial/urlhandler/protocol_loop.py
@@ -0,0 +1,279 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# This module implements a loop back connection receiving itself what it sent.
+#
+# The purpose of this module is.. well... You can run the unit tests with it.
+# and it was so easy to implement ;-)
+#
+# (C) 2001-2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# URL format: loop://[option[/option...]]
+# options:
+# - "debug" print diagnostic messages
+
+from serial.serialutil import *
+import threading
+import time
+import logging
+
+# map log level names to constants. used in fromURL()
+LOGGER_LEVELS = {
+ 'debug': logging.DEBUG,
+ 'info': logging.INFO,
+ 'warning': logging.WARNING,
+ 'error': logging.ERROR,
+ }
+
+
+class LoopbackSerial(SerialBase):
+ """Serial port implementation that simulates a loop back connection in plain software."""
+
+ BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
+ 9600, 19200, 38400, 57600, 115200)
+
+ def open(self):
+ """\
+ Open port with current settings. This may throw a SerialException
+ if the port cannot be opened.
+ """
+ if self._isOpen:
+ raise SerialException("Port is already open.")
+ self.logger = None
+ self.buffer_lock = threading.Lock()
+ self.loop_buffer = bytearray()
+ self.cts = False
+ self.dsr = False
+
+ if self._port is None:
+ raise SerialException("Port must be configured before it can be used.")
+ # not that there is anything to open, but the function applies the
+ # options found in the URL
+ self.fromURL(self.port)
+
+ # not that there anything to configure...
+ self._reconfigurePort()
+ # all things set up get, now a clean start
+ self._isOpen = True
+ if not self._rtscts:
+ self.setRTS(True)
+ self.setDTR(True)
+ self.flushInput()
+ self.flushOutput()
+
+ def _reconfigurePort(self):
+ """\
+ Set communication parameters on opened port. For the loop://
+ protocol all settings are ignored!
+ """
+ # not that's it of any real use, but it helps in the unit tests
+ if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate < 2**32:
+ raise ValueError("invalid baudrate: %r" % (self._baudrate))
+ if self.logger:
+ self.logger.info('_reconfigurePort()')
+
+ def close(self):
+ """Close port"""
+ if self._isOpen:
+ self._isOpen = False
+ # in case of quick reconnects, give the server some time
+ time.sleep(0.3)
+
+ def makeDeviceName(self, port):
+ raise SerialException("there is no sensible way to turn numbers into URLs")
+
+ def fromURL(self, url):
+ """extract host and port from an URL string"""
+ if url.lower().startswith("loop://"): url = url[7:]
+ try:
+ # process options now, directly altering self
+ for option in url.split('/'):
+ if '=' in option:
+ option, value = option.split('=', 1)
+ else:
+ value = None
+ if not option:
+ pass
+ elif option == 'logging':
+ logging.basicConfig() # XXX is that good to call it here?
+ self.logger = logging.getLogger('pySerial.loop')
+ self.logger.setLevel(LOGGER_LEVELS[value])
+ self.logger.debug('enabled logging')
+ else:
+ raise ValueError('unknown option: %r' % (option,))
+ except ValueError, e:
+ raise SerialException('expected a string in the form "[loop://][option[/option...]]": %s' % e)
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
+
+ def inWaiting(self):
+ """Return the number of characters currently in the input buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ # attention the logged value can differ from return value in
+ # threaded environments...
+ self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),))
+ return len(self.loop_buffer)
+
+ def read(self, size=1):
+ """\
+ Read size bytes from the serial port. If a timeout is set it may
+ return less characters as requested. With no timeout it will block
+ until the requested number of bytes is read.
+ """
+ if not self._isOpen: raise portNotOpenError
+ if self._timeout is not None:
+ timeout = time.time() + self._timeout
+ else:
+ timeout = None
+ data = bytearray()
+ while size > 0:
+ self.buffer_lock.acquire()
+ try:
+ block = to_bytes(self.loop_buffer[:size])
+ del self.loop_buffer[:size]
+ finally:
+ self.buffer_lock.release()
+ data += block
+ size -= len(block)
+ # check for timeout now, after data has been read.
+ # useful for timeout = 0 (non blocking) read
+ if timeout and time.time() > timeout:
+ break
+ return bytes(data)
+
+ def write(self, data):
+ """\
+ Output the given string over the serial port. Can block if the
+ connection is blocked. May raise SerialException if the connection is
+ closed.
+ """
+ if not self._isOpen: raise portNotOpenError
+ # ensure we're working with bytes
+ data = to_bytes(data)
+ # calculate aprox time that would be used to send the data
+ time_used_to_send = 10.0*len(data) / self._baudrate
+ # when a write timeout is configured check if we would be successful
+ # (not sending anything, not even the part that would have time)
+ if self._writeTimeout is not None and time_used_to_send > self._writeTimeout:
+ time.sleep(self._writeTimeout) # must wait so that unit test succeeds
+ raise writeTimeoutError
+ self.buffer_lock.acquire()
+ try:
+ self.loop_buffer += data
+ finally:
+ self.buffer_lock.release()
+ return len(data)
+
+ def flushInput(self):
+ """Clear input buffer, discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('flushInput()')
+ self.buffer_lock.acquire()
+ try:
+ del self.loop_buffer[:]
+ finally:
+ self.buffer_lock.release()
+
+ def flushOutput(self):
+ """\
+ Clear output buffer, aborting the current output and
+ discarding all that is in the buffer.
+ """
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('flushOutput()')
+
+ def sendBreak(self, duration=0.25):
+ """\
+ Send break condition. Timed, returns to idle state after given
+ duration.
+ """
+ if not self._isOpen: raise portNotOpenError
+
+ def setBreak(self, level=True):
+ """\
+ Set break: Controls TXD. When active, to transmitting is
+ possible.
+ """
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('setBreak(%r)' % (level,))
+
+ def setRTS(self, level=True):
+ """Set terminal status line: Request To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('setRTS(%r) -> state of CTS' % (level,))
+ self.cts = level
+
+ def setDTR(self, level=True):
+ """Set terminal status line: Data Terminal Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('setDTR(%r) -> state of DSR' % (level,))
+ self.dsr = level
+
+ def getCTS(self):
+ """Read terminal status line: Clear To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,))
+ return self.cts
+
+ def getDSR(self):
+ """Read terminal status line: Data Set Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,))
+ return self.dsr
+
+ def getRI(self):
+ """Read terminal status line: Ring Indicator"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getRI()')
+ return False
+
+ def getCD(self):
+ """Read terminal status line: Carrier Detect"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCD()')
+ return True
+
+ # - - - platform specific - - -
+ # None so far
+
+
+# assemble Serial class with the platform specific implementation and the base
+# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
+# library, derive from io.RawIOBase
+try:
+ import io
+except ImportError:
+ # classic version with our own file-like emulation
+ class Serial(LoopbackSerial, FileLike):
+ pass
+else:
+ # io library present
+ class Serial(LoopbackSerial, io.RawIOBase):
+ pass
+
+
+# simple client test
+if __name__ == '__main__':
+ import sys
+ s = Serial('loop://')
+ sys.stdout.write('%s\n' % s)
+
+ sys.stdout.write("write...\n")
+ s.write("hello\n")
+ s.flush()
+ sys.stdout.write("read: %s\n" % s.read(5))
+
+ s.close()
diff --git a/scripts/serial/urlhandler/protocol_rfc2217.py b/scripts/serial/urlhandler/protocol_rfc2217.py
new file mode 100644
index 0000000000..981ba45fea
--- /dev/null
+++ b/scripts/serial/urlhandler/protocol_rfc2217.py
@@ -0,0 +1,11 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see ../__init__.py
+#
+# This is a thin wrapper to load the rfc2271 implementation.
+#
+# (C) 2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+from serial.rfc2217 import Serial
diff --git a/scripts/serial/urlhandler/protocol_socket.py b/scripts/serial/urlhandler/protocol_socket.py
new file mode 100644
index 0000000000..dc5992342c
--- /dev/null
+++ b/scripts/serial/urlhandler/protocol_socket.py
@@ -0,0 +1,291 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# This module implements a simple socket based client.
+# It does not support changing any port parameters and will silently ignore any
+# requests to do so.
+#
+# The purpose of this module is that applications using pySerial can connect to
+# TCP/IP to serial port converters that do not support RFC 2217.
+#
+# (C) 2001-2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# URL format: socket://<host>:<port>[/option[/option...]]
+# options:
+# - "debug" print diagnostic messages
+
+from serial.serialutil import *
+import time
+import socket
+import select
+import logging
+
+# map log level names to constants. used in fromURL()
+LOGGER_LEVELS = {
+ 'debug': logging.DEBUG,
+ 'info': logging.INFO,
+ 'warning': logging.WARNING,
+ 'error': logging.ERROR,
+ }
+
+POLL_TIMEOUT = 2
+
+class SocketSerial(SerialBase):
+ """Serial port implementation for plain sockets."""
+
+ BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
+ 9600, 19200, 38400, 57600, 115200)
+
+ def open(self):
+ """\
+ Open port with current settings. This may throw a SerialException
+ if the port cannot be opened.
+ """
+ self.logger = None
+ if self._port is None:
+ raise SerialException("Port must be configured before it can be used.")
+ if self._isOpen:
+ raise SerialException("Port is already open.")
+ try:
+ # XXX in future replace with create_connection (py >=2.6)
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(self.fromURL(self.portstr))
+ except Exception, msg:
+ self._socket = None
+ raise SerialException("Could not open port %s: %s" % (self.portstr, msg))
+
+ self._socket.settimeout(POLL_TIMEOUT) # used for write timeout support :/
+
+ # not that there anything to configure...
+ self._reconfigurePort()
+ # all things set up get, now a clean start
+ self._isOpen = True
+ if not self._rtscts:
+ self.setRTS(True)
+ self.setDTR(True)
+ self.flushInput()
+ self.flushOutput()
+
+ def _reconfigurePort(self):
+ """\
+ Set communication parameters on opened port. For the socket://
+ protocol all settings are ignored!
+ """
+ if self._socket is None:
+ raise SerialException("Can only operate on open ports")
+ if self.logger:
+ self.logger.info('ignored port configuration change')
+
+ def close(self):
+ """Close port"""
+ if self._isOpen:
+ if self._socket:
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ self._socket.close()
+ except:
+ # ignore errors.
+ pass
+ self._socket = None
+ self._isOpen = False
+ # in case of quick reconnects, give the server some time
+ time.sleep(0.3)
+
+ def makeDeviceName(self, port):
+ raise SerialException("there is no sensible way to turn numbers into URLs")
+
+ def fromURL(self, url):
+ """extract host and port from an URL string"""
+ if url.lower().startswith("socket://"): url = url[9:]
+ try:
+ # is there a "path" (our options)?
+ if '/' in url:
+ # cut away options
+ url, options = url.split('/', 1)
+ # process options now, directly altering self
+ for option in options.split('/'):
+ if '=' in option:
+ option, value = option.split('=', 1)
+ else:
+ value = None
+ if option == 'logging':
+ logging.basicConfig() # XXX is that good to call it here?
+ self.logger = logging.getLogger('pySerial.socket')
+ self.logger.setLevel(LOGGER_LEVELS[value])
+ self.logger.debug('enabled logging')
+ else:
+ raise ValueError('unknown option: %r' % (option,))
+ # get host and port
+ host, port = url.split(':', 1) # may raise ValueError because of unpacking
+ port = int(port) # and this if it's not a number
+ if not 0 <= port < 65536: raise ValueError("port not in range 0...65535")
+ except ValueError, e:
+ raise SerialException('expected a string in the form "[rfc2217://]<host>:<port>[/option[/option...]]": %s' % e)
+ return (host, port)
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
+
+ def inWaiting(self):
+ """Return the number of characters currently in the input buffer."""
+ if not self._isOpen: raise portNotOpenError
+ # Poll the socket to see if it is ready for reading.
+ # If ready, at least one byte will be to read.
+ lr, lw, lx = select.select([self._socket], [], [], 0)
+ return len(lr)
+
+ def read(self, size=1):
+ """\
+ Read size bytes from the serial port. If a timeout is set it may
+ return less characters as requested. With no timeout it will block
+ until the requested number of bytes is read.
+ """
+ if not self._isOpen: raise portNotOpenError
+ data = bytearray()
+ if self._timeout is not None:
+ timeout = time.time() + self._timeout
+ else:
+ timeout = None
+ while len(data) < size and (timeout is None or time.time() < timeout):
+ try:
+ # an implementation with internal buffer would be better
+ # performing...
+ t = time.time()
+ block = self._socket.recv(size - len(data))
+ duration = time.time() - t
+ if block:
+ data.extend(block)
+ else:
+ # no data -> EOF (connection probably closed)
+ break
+ except socket.timeout:
+ # just need to get out of recv from time to time to check if
+ # still alive
+ continue
+ except socket.error, e:
+ # connection fails -> terminate loop
+ raise SerialException('connection failed (%s)' % e)
+ return bytes(data)
+
+ def write(self, data):
+ """\
+ Output the given string over the serial port. Can block if the
+ connection is blocked. May raise SerialException if the connection is
+ closed.
+ """
+ if not self._isOpen: raise portNotOpenError
+ try:
+ self._socket.sendall(to_bytes(data))
+ except socket.error, e:
+ # XXX what exception if socket connection fails
+ raise SerialException("socket connection failed: %s" % e)
+ return len(data)
+
+ def flushInput(self):
+ """Clear input buffer, discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushInput')
+
+ def flushOutput(self):
+ """\
+ Clear output buffer, aborting the current output and
+ discarding all that is in the buffer.
+ """
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushOutput')
+
+ def sendBreak(self, duration=0.25):
+ """\
+ Send break condition. Timed, returns to idle state after given
+ duration.
+ """
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored sendBreak(%r)' % (duration,))
+
+ def setBreak(self, level=True):
+ """Set break: Controls TXD. When active, to transmitting is
+ possible."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setBreak(%r)' % (level,))
+
+ def setRTS(self, level=True):
+ """Set terminal status line: Request To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setRTS(%r)' % (level,))
+
+ def setDTR(self, level=True):
+ """Set terminal status line: Data Terminal Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setDTR(%r)' % (level,))
+
+ def getCTS(self):
+ """Read terminal status line: Clear To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCTS()')
+ return True
+
+ def getDSR(self):
+ """Read terminal status line: Data Set Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getDSR()')
+ return True
+
+ def getRI(self):
+ """Read terminal status line: Ring Indicator"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getRI()')
+ return False
+
+ def getCD(self):
+ """Read terminal status line: Carrier Detect"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCD()')
+ return True
+
+ # - - - platform specific - - -
+
+ # works on Linux and probably all the other POSIX systems
+ def fileno(self):
+ """Get the file handle of the underlying socket for use with select"""
+ return self._socket.fileno()
+
+
+# assemble Serial class with the platform specific implementation and the base
+# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
+# library, derive from io.RawIOBase
+try:
+ import io
+except ImportError:
+ # classic version with our own file-like emulation
+ class Serial(SocketSerial, FileLike):
+ pass
+else:
+ # io library present
+ class Serial(SocketSerial, io.RawIOBase):
+ pass
+
+
+# simple client test
+if __name__ == '__main__':
+ import sys
+ s = Serial('socket://localhost:7000')
+ sys.stdout.write('%s\n' % s)
+
+ sys.stdout.write("write...\n")
+ s.write("hello\n")
+ s.flush()
+ sys.stdout.write("read: %s\n" % s.read(5))
+
+ s.close()