terminal/terminal.py
Jim Paris 9838c8136f Add option to force DTS and RTS pins high
This is for development boards like those for the ESP32, where it's
common to use RTS and DTR to control chip resets.
2020-09-25 14:35:18 -04:00

426 lines
15 KiB
Python
Executable File

#!/usr/bin/python
# Jim Paris <jim@jtan.com>
# Simple terminal program for serial devices. Supports setting
# baudrates and simple LF->CRLF mapping on input, and basic
# flow control, but nothing fancy.
# ^C quits. There is no escaping, so you can't currently send this
# character to the remote host. Piping input or output should work.
# Supports multiple serial devices simultaneously. When using more
# than one, each device's output is in a different color. Input
# is directed to the first device, or can be sent to all devices
# with --all.
import sys
import os
import serial
import threading
import traceback
import time
import signal
import fcntl
import string
import re
# Need OS-specific method for getting keyboard input.
if os.name == 'nt':
import msvcrt
class Console:
def __init__(self, bufsize = 1):
# Buffer size > 1 not supported on Windows
self.tty = True
def cleanup(self):
pass
def getkey(self):
start = time.time()
while True:
z = msvcrt.getch()
if z == '\0' or z == '\xe0': # function keys
msvcrt.getch()
else:
if z == '\r':
return '\n'
return z
if (time.time() - start) > 0.1:
return None
class MySerial(serial.Serial):
def nonblocking_read(self, size=1):
# Buffer size > 1 not supported on Windows
return self.read(1)
elif os.name == 'posix':
import termios, select, errno
class Console:
def __init__(self, bufsize = 65536):
self.bufsize = bufsize
self.fd = sys.stdin.fileno()
if os.isatty(self.fd):
self.tty = True
self.old = termios.tcgetattr(self.fd)
tc = termios.tcgetattr(self.fd)
tc[3] = tc[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
tc[6][termios.VMIN] = 1
tc[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, tc)
else:
self.tty = False
def cleanup(self):
if self.tty:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
def getkey(self):
# Return -1 if we don't get input in 0.1 seconds, so that
# the main code can check the "alive" flag and respond to SIGINT.
[r, w, x] = select.select([self.fd], [], [self.fd], 0.1)
if r:
return os.read(self.fd, self.bufsize)
elif x:
return ''
else:
return None
class MySerial(serial.Serial):
def nonblocking_read(self, size=1):
[r, w, x] = select.select([self.fd], [], [self.fd], self._timeout)
if r:
try:
return os.read(self.fd, size)
except OSError as e:
if e.errno == errno.EAGAIN:
return None
raise
elif x:
raise SerialException("exception (device disconnected?)")
else:
return None # timeout
else:
raise ("Sorry, no terminal implementation for your platform (%s) "
"available." % sys.platform)
class JimtermColor(object):
def __init__(self):
self.setup(1)
def setup(self, total):
if total > 1:
self.codes = [
"\x1b[1;36m", # cyan
"\x1b[1;33m", # yellow
"\x1b[1;35m", # magenta
"\x1b[1;31m", # red
"\x1b[1;32m", # green
"\x1b[1;34m", # blue
"\x1b[1;37m", # white
]
self.reset = "\x1b[0m"
else:
self.codes = [""]
self.reset = ""
def code(self, n):
return self.codes[n % len(self.codes)]
class Jimterm:
"""Normal interactive terminal"""
def __init__(self,
serials,
suppress_write_bytes = None,
suppress_read_firstnull = True,
transmit_all = False,
add_cr = False,
send_cr = False,
raw = True,
color = True,
bufsize = 65536):
self.color = JimtermColor()
if color:
self.color.setup(len(serials))
self.serials = serials
self.suppress_write_bytes = suppress_write_bytes
self.suppress_read_firstnull = suppress_read_firstnull
self.last_color = ""
self.threads = []
self.transmit_all = transmit_all
self.add_cr = add_cr
self.send_cr = send_cr
self.raw = raw
self.bufsize = bufsize
self.quote_re = None
self.output_lock = threading.Lock()
def print_header(self, nodes, bauds, output = sys.stdout):
for (n, (node, baud)) in enumerate(zip(nodes, bauds)):
output.write(self.color.code(n)
+ node + ", " + str(baud) + " baud"
+ self.color.reset + "\n")
if sys.stdin.isatty():
output.write("^C to exit\n")
output.write("----------\n")
output.flush()
def start(self):
self.alive = True
# Set up console
self.console = Console(self.bufsize)
# serial->console, all devices
for (n, serial) in enumerate(self.serials):
self.threads.append(threading.Thread(
target = self.reader,
args = (serial, self.color.code(n))
))
# console->serial
self.threads.append(threading.Thread(target = self.writer))
# start all threads
for thread in self.threads:
thread.daemon = True
thread.start()
def stop(self):
self.alive = False
def join(self):
for thread in self.threads:
while thread.isAlive():
thread.join(0.1)
def quote_raw(self, data):
if self.quote_re is None:
matcher = '[^%s]' % re.escape(string.printable + "\b")
if sys.version_info < (3,):
self.quote_re = re.compile(matcher)
qf = lambda x: ("\\x%02x" % ord(x.group(0)))
else:
self.quote_re = re.compile(matcher.encode('ascii'))
qf = lambda x: ("\\x%02x" % ord(x.group(0))).encode('ascii')
self.quote_func = qf
return self.quote_re.sub(self.quote_func, data)
def reader(self, serial, color):
"""loop and copy serial->console"""
first = True
try:
if (sys.version_info < (3,)):
null = '\x00'
else:
null = b'\x00'
while self.alive:
data = serial.nonblocking_read(self.bufsize)
if data is None:
continue
if not len(data):
raise Exception("read returned EOF")
# don't print a NULL if it's the first character we
# read. This hides startup/port-opening glitches with
# some serial devices.
if self.suppress_read_firstnull and first and data[0] == null:
first = False
data = data[1:]
first = False
self.output_lock.acquire()
if color != self.last_color:
self.last_color = color
os.write(sys.stdout.fileno(), color)
if self.add_cr:
if sys.version_info < (3,):
data = data.replace('\n', '\r\n')
else:
data = data.replace(b'\n', b'\r\n')
if not self.raw:
data = self.quote_raw(data)
os.write(sys.stdout.fileno(), data)
self.output_lock.release()
except Exception as e:
self.console.cleanup()
sys.stdout.write(color)
sys.stdout.flush()
traceback.print_exc()
sys.stdout.write(self.color.reset)
sys.stdout.flush()
os._exit(1)
def writer(self):
"""loop and copy console->serial until ^C"""
try:
if (sys.version_info < (3,)):
ctrlc = '\x03'
else:
ctrlc = b'\x03'
while self.alive:
try:
c = self.console.getkey()
except KeyboardInterrupt:
self.stop()
return
if c is None:
# No input, try again.
continue
elif self.console.tty and ctrlc in c:
# Try to catch ^C that didn't trigger KeyboardInterrupt
self.stop()
return
elif c == '':
# Probably EOF on input. Wait a tiny bit so we can
# flush the remaining input, then stop.
time.sleep(0.25)
self.stop()
return
else:
# Remove bytes we don't want to send
if self.suppress_write_bytes is not None:
c = c.translate(None, self.suppress_write_bytes)
if self.send_cr and c == '\n':
c = '\r'
# Send character
if self.transmit_all:
for serial in self.serials:
serial.write(c)
else:
self.serials[0].write(c)
except Exception as e:
self.console.cleanup()
sys.stdout.write(self.color.reset)
sys.stdout.flush()
traceback.print_exc()
os._exit(1)
def run(self):
# Set all serial port timeouts to 0.1 sec
saved_timeouts = []
for serial in self.serials:
saved_timeouts.append(serial.timeout)
serial.timeout = 0.1
# Work around https://sourceforge.net/p/pyserial/bugs/151/
saved_writeTimeouts = []
for serial in self.serials:
saved_writeTimeouts.append(serial.writeTimeout)
serial.writeTimeout = 1000000
# Handle SIGINT gracefully
signal.signal(signal.SIGINT, lambda *args: self.stop())
# Go
self.start()
self.join()
# Restore serial port timeouts
for (serial, saved) in zip(self.serials, saved_timeouts):
serial.timeout = saved
for (serial, saved) in zip(self.serials, saved_writeTimeouts):
serial.writeTimeout = saved
# Cleanup
sys.stdout.write(self.color.reset + "\n")
self.console.cleanup()
if __name__ == "__main__":
import argparse
import re
formatter = argparse.ArgumentDefaultsHelpFormatter
description = ("Simple serial terminal that supports multiple devices. "
"If more than one device is specified, device output is "
"shown in varying colors. All input goes to the "
"first device.")
parser = argparse.ArgumentParser(description = description,
formatter_class = formatter)
parser.add_argument("device", metavar="DEVICE", nargs="+",
help="Serial device. Specify DEVICE@BAUD for "
"per-device baudrates.")
parser.add_argument("--quiet", "-q", action="store_true",
help="Don't print header")
parser.add_argument("--baudrate", "-b", metavar="BAUD", type=int,
help="Default baudrate for all devices", default=115200)
parser.add_argument("--crlf", "-c", action="store_true",
help="Add CR before incoming LF")
parser.add_argument("--lfcr", "-C", action="store_true",
help="Send CR instead of LF on output")
parser.add_argument("--all", "-a", action="store_true",
help="Send keystrokes to all devices, not just "
"the first one")
parser.add_argument("--mono", "-m", action="store_true",
help="Don't use colors in output")
parser.add_argument("--flow", "-f", action="store_true",
help="Enable RTS/CTS flow control")
parser.add_argument("--esp", "-e", action="store_true",
help="Force RTS and DTR high, for ESP boards. Note that"
" the lines may still glitch low at startup.")
parser.add_argument("--bufsize", "-z", metavar="SIZE", type=int,
help="Buffer size for reads and writes", default=65536)
group = parser.add_mutually_exclusive_group(required = False)
group.add_argument("--raw", "-r", action="store_true",
default=argparse.SUPPRESS,
help="Output characters directly "
"(default, if stdout is not a tty)")
group.add_argument("--no-raw", "-R", action="store_true",
default=argparse.SUPPRESS,
help="Quote unprintable characters "
"(default, if stdout is a tty)")
args = parser.parse_args()
piped = not sys.stdout.isatty()
raw = "raw" in args or (piped and "no_raw" not in args)
devs = []
nodes = []
bauds = []
for (n, device) in enumerate(args.device):
m = re.search(r"^(.*)@([1-9][0-9]*)$", device)
if m is not None:
node = m.group(1)
baud = m.group(2)
else:
node = device
baud = args.baudrate
if node in nodes:
sys.stderr.write("error: %s specified more than once\n" % node)
raise SystemExit(1)
try:
dev = MySerial(None, baud, rtscts = args.flow)
dev.port = node
if args.esp:
# Force DTR and RTS high by setting to false
dev.dtr = False
dev.rts = False
dev.open()
except serial.serialutil.SerialException:
sys.stderr.write("error opening %s\n" % node)
raise SystemExit(1)
nodes.append(node)
bauds.append(baud)
devs.append(dev)
term = Jimterm(devs,
transmit_all = args.all,
add_cr = args.crlf,
send_cr = args.lfcr,
raw = raw,
color = (os.name == "posix" and not args.mono),
bufsize = args.bufsize)
if not args.quiet:
term.print_header(nodes, bauds, sys.stderr)
term.run()