|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- #!/usr/bin/python
-
- # Jim Paris <jim@jtan.com>
-
- # Simple terminal program for serial devices. Supports setting
- # baudrates and simple LF->CRLF mapping on input, and basic
- # flow control, but nothing fancy.
-
- # ^C quits. There is no escaping, so you can't currently send this
- # character to the remote host. Piping input or output should work.
-
- # Supports multiple serial devices simultaneously. When using more
- # than one, each device's output is in a different color. Input
- # is directed to the first device, or can be sent to all devices
- # with --all.
-
- import sys
- import os
- import serial
- import threading
- import traceback
- import time
- import signal
- import fcntl
- import string
- import re
-
- # Need OS-specific method for getting keyboard input.
- if os.name == 'nt':
- import msvcrt
- class Console:
- def __init__(self, bufsize = 1):
- # Buffer size > 1 not supported on Windows
- self.tty = True
- def cleanup(self):
- pass
- def getkey(self):
- start = time.time()
- while True:
- z = msvcrt.getch()
- if z == '\0' or z == '\xe0': # function keys
- msvcrt.getch()
- else:
- if z == '\r':
- return '\n'
- return z
- if (time.time() - start) > 0.1:
- return None
-
- class MySerial(serial.Serial):
- def nonblocking_read(self, size=1):
- # Buffer size > 1 not supported on Windows
- return self.read(1)
- elif os.name == 'posix':
- import termios, select, errno
- class Console:
- def __init__(self, bufsize = 65536):
- self.bufsize = bufsize
- self.fd = sys.stdin.fileno()
- if os.isatty(self.fd):
- self.tty = True
- self.old = termios.tcgetattr(self.fd)
- tc = termios.tcgetattr(self.fd)
- tc[3] = tc[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
- tc[6][termios.VMIN] = 1
- tc[6][termios.VTIME] = 0
- termios.tcsetattr(self.fd, termios.TCSANOW, tc)
- else:
- self.tty = False
- def cleanup(self):
- if self.tty:
- termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
- def getkey(self):
- # Return -1 if we don't get input in 0.1 seconds, so that
- # the main code can check the "alive" flag and respond to SIGINT.
- [r, w, x] = select.select([self.fd], [], [self.fd], 0.1)
- if r:
- return os.read(self.fd, self.bufsize)
- elif x:
- return ''
- else:
- return None
-
- class MySerial(serial.Serial):
- def nonblocking_read(self, size=1):
- [r, w, x] = select.select([self.fd], [], [self.fd], self._timeout)
- if r:
- try:
- return os.read(self.fd, size)
- except OSError as e:
- if e.errno == errno.EAGAIN:
- return None
- raise
- elif x:
- raise SerialException("exception (device disconnected?)")
- else:
- return None # timeout
-
- else:
- raise ("Sorry, no terminal implementation for your platform (%s) "
- "available." % sys.platform)
-
- class JimtermColor(object):
- def __init__(self):
- self.setup(1)
- def setup(self, total):
- if total > 1:
- self.codes = [
- "\x1b[1;36m", # cyan
- "\x1b[1;33m", # yellow
- "\x1b[1;35m", # magenta
- "\x1b[1;31m", # red
- "\x1b[1;32m", # green
- "\x1b[1;34m", # blue
- "\x1b[1;37m", # white
- ]
- self.reset = "\x1b[0m"
- else:
- self.codes = [""]
- self.reset = ""
- def code(self, n):
- return self.codes[n % len(self.codes)]
-
- class Jimterm:
- """Normal interactive terminal"""
-
- def __init__(self,
- serials,
- suppress_write_bytes = None,
- suppress_read_firstnull = True,
- transmit_all = False,
- add_cr = False,
- send_cr = False,
- raw = True,
- color = True,
- bufsize = 65536):
-
- self.color = JimtermColor()
- if color:
- self.color.setup(len(serials))
-
- self.serials = serials
- self.suppress_write_bytes = suppress_write_bytes
- self.suppress_read_firstnull = suppress_read_firstnull
- self.last_color = ""
- self.threads = []
- self.transmit_all = transmit_all
- self.add_cr = add_cr
- self.send_cr = send_cr
- self.raw = raw
- self.bufsize = bufsize
- self.quote_re = None
- self.output_lock = threading.Lock()
-
- def print_header(self, nodes, bauds, output = sys.stdout):
- for (n, (node, baud)) in enumerate(zip(nodes, bauds)):
- output.write(self.color.code(n)
- + node + ", " + str(baud) + " baud"
- + self.color.reset + "\n")
- if sys.stdin.isatty():
- output.write("^C to exit\n")
- output.write("----------\n")
- output.flush()
-
- def start(self):
- self.alive = True
-
- # Set up console
- self.console = Console(self.bufsize)
-
- # serial->console, all devices
- for (n, serial) in enumerate(self.serials):
- self.threads.append(threading.Thread(
- target = self.reader,
- args = (serial, self.color.code(n))
- ))
-
- # console->serial
- self.threads.append(threading.Thread(target = self.writer))
-
- # start all threads
- for thread in self.threads:
- thread.daemon = True
- thread.start()
-
- def stop(self):
- self.alive = False
-
- def join(self):
- for thread in self.threads:
- while thread.isAlive():
- thread.join(0.1)
-
- def quote_raw(self, data):
- if self.quote_re is None:
- matcher = '[^%s]' % re.escape(string.printable + "\b")
- if sys.version_info < (3,):
- self.quote_re = re.compile(matcher)
- qf = lambda x: ("\\x%02x" % ord(x.group(0)))
- else:
- self.quote_re = re.compile(matcher.encode('ascii'))
- qf = lambda x: ("\\x%02x" % ord(x.group(0))).encode('ascii')
- self.quote_func = qf
- return self.quote_re.sub(self.quote_func, data)
-
- def reader(self, serial, color):
- """loop and copy serial->console"""
- first = True
- try:
- if (sys.version_info < (3,)):
- null = '\x00'
- else:
- null = b'\x00'
- while self.alive:
- data = serial.nonblocking_read(self.bufsize)
- if data is None:
- continue
- if not len(data):
- raise Exception("read returned EOF")
-
- # don't print a NULL if it's the first character we
- # read. This hides startup/port-opening glitches with
- # some serial devices.
- if self.suppress_read_firstnull and first and data[0] == null:
- first = False
- data = data[1:]
- first = False
-
- self.output_lock.acquire()
-
- if color != self.last_color:
- self.last_color = color
- os.write(sys.stdout.fileno(), color)
-
- if self.add_cr:
- if sys.version_info < (3,):
- data = data.replace('\n', '\r\n')
- else:
- data = data.replace(b'\n', b'\r\n')
-
- if not self.raw:
- data = self.quote_raw(data)
-
- os.write(sys.stdout.fileno(), data)
-
- self.output_lock.release()
-
- except Exception as e:
- self.console.cleanup()
- sys.stdout.write(color)
- sys.stdout.flush()
- traceback.print_exc()
- sys.stdout.write(self.color.reset)
- sys.stdout.flush()
- os._exit(1)
-
- def writer(self):
- """loop and copy console->serial until ^C"""
- try:
- if (sys.version_info < (3,)):
- ctrlc = '\x03'
- else:
- ctrlc = b'\x03'
- while self.alive:
- try:
- c = self.console.getkey()
- except KeyboardInterrupt:
- self.stop()
- return
- if c is None:
- # No input, try again.
- continue
- elif self.console.tty and ctrlc in c:
- # Try to catch ^C that didn't trigger KeyboardInterrupt
- self.stop()
- return
- elif c == '':
- # Probably EOF on input. Wait a tiny bit so we can
- # flush the remaining input, then stop.
- time.sleep(0.25)
- self.stop()
- return
- else:
- # Remove bytes we don't want to send
- if self.suppress_write_bytes is not None:
- c = c.translate(None, self.suppress_write_bytes)
-
- if self.send_cr and c == '\n':
- c = '\r'
- # Send character
- if self.transmit_all:
- for serial in self.serials:
- serial.write(c)
- else:
- self.serials[0].write(c)
- except Exception as e:
- self.console.cleanup()
- sys.stdout.write(self.color.reset)
- sys.stdout.flush()
- traceback.print_exc()
- os._exit(1)
-
- def run(self):
- # Set all serial port timeouts to 0.1 sec
- saved_timeouts = []
- for serial in self.serials:
- saved_timeouts.append(serial.timeout)
- serial.timeout = 0.1
-
- # Work around https://sourceforge.net/p/pyserial/bugs/151/
- saved_writeTimeouts = []
- for serial in self.serials:
- saved_writeTimeouts.append(serial.writeTimeout)
- serial.writeTimeout = 1000000
-
- # Handle SIGINT gracefully
- signal.signal(signal.SIGINT, lambda *args: self.stop())
-
- # Go
- self.start()
- self.join()
-
- # Restore serial port timeouts
- for (serial, saved) in zip(self.serials, saved_timeouts):
- serial.timeout = saved
- for (serial, saved) in zip(self.serials, saved_writeTimeouts):
- serial.writeTimeout = saved
-
- # Cleanup
- sys.stdout.write(self.color.reset + "\n")
- self.console.cleanup()
-
- if __name__ == "__main__":
- import argparse
- import re
-
- formatter = argparse.ArgumentDefaultsHelpFormatter
- description = ("Simple serial terminal that supports multiple devices. "
- "If more than one device is specified, device output is "
- "shown in varying colors. All input goes to the "
- "first device.")
- parser = argparse.ArgumentParser(description = description,
- formatter_class = formatter)
-
- parser.add_argument("device", metavar="DEVICE", nargs="+",
- help="Serial device. Specify DEVICE@BAUD for "
- "per-device baudrates.")
-
- parser.add_argument("--quiet", "-q", action="store_true",
- help="Don't print header")
-
- parser.add_argument("--baudrate", "-b", metavar="BAUD", type=int,
- help="Default baudrate for all devices", default=115200)
- parser.add_argument("--crlf", "-c", action="store_true",
- help="Add CR before incoming LF")
- parser.add_argument("--lfcr", "-C", action="store_true",
- help="Send CR instead of LF on output")
- parser.add_argument("--all", "-a", action="store_true",
- help="Send keystrokes to all devices, not just "
- "the first one")
- parser.add_argument("--mono", "-m", action="store_true",
- help="Don't use colors in output")
- parser.add_argument("--flow", "-f", action="store_true",
- help="Enable RTS/CTS flow control")
- parser.add_argument("--esp", "-e", action="store_true",
- help="Force RTS and DTR high, for ESP boards. Note that"
- " the lines may still glitch low at startup.")
- parser.add_argument("--bufsize", "-z", metavar="SIZE", type=int,
- help="Buffer size for reads and writes", default=65536)
-
- group = parser.add_mutually_exclusive_group(required = False)
- group.add_argument("--raw", "-r", action="store_true",
- default=argparse.SUPPRESS,
- help="Output characters directly "
- "(default, if stdout is not a tty)")
- group.add_argument("--no-raw", "-R", action="store_true",
- default=argparse.SUPPRESS,
- help="Quote unprintable characters "
- "(default, if stdout is a tty)")
-
- args = parser.parse_args()
-
- piped = not sys.stdout.isatty()
- raw = "raw" in args or (piped and "no_raw" not in args)
-
- devs = []
- nodes = []
- bauds = []
- for (n, device) in enumerate(args.device):
- m = re.search(r"^(.*)@([1-9][0-9]*)$", device)
- if m is not None:
- node = m.group(1)
- baud = m.group(2)
- else:
- node = device
- baud = args.baudrate
- if node in nodes:
- sys.stderr.write("error: %s specified more than once\n" % node)
- raise SystemExit(1)
- try:
- dev = MySerial(None, baud, rtscts = args.flow)
- dev.port = node
- if args.esp:
- # Force DTR and RTS high by setting to false
- dev.dtr = False
- dev.rts = False
- dev.open()
- except serial.serialutil.SerialException:
- sys.stderr.write("error opening %s\n" % node)
- raise SystemExit(1)
- nodes.append(node)
- bauds.append(baud)
- devs.append(dev)
-
- term = Jimterm(devs,
- transmit_all = args.all,
- add_cr = args.crlf,
- send_cr = args.lfcr,
- raw = raw,
- color = (os.name == "posix" and not args.mono),
- bufsize = args.bufsize)
- if not args.quiet:
- term.print_header(nodes, bauds, sys.stderr)
-
- term.run()
|