Compare commits

...

17 Commits
v1.0 ... master

Author SHA1 Message Date
9838c8136f Add option to force DTS and RTS pins high
This is for development boards like those for the ESP32, where it's
common to use RTS and DTR to control chip resets.
2020-09-25 14:35:18 -04:00
7c536a2d6b Lock output so that terminal escape sequences don't mix 2017-07-24 14:43:43 -04:00
8075e9fe34 Fix typo 2017-04-27 14:12:05 -04:00
44f431db13 Add send_cr option to translate LF to CR on output 2017-04-26 16:44:07 -04:00
598345b792 TODO 2014-11-04 19:44:46 -05:00
0df36bb543 Fix bug in detecting EOF 2013-11-30 18:52:52 -05:00
87a956e72c Consider backspace printable 2013-11-26 15:06:34 -05:00
cfd9bec445 Improve raw quoting; add Python 3 support 2013-07-05 14:00:55 -04:00
6756a87def Fix read/write issues by not using nonblocking mode 2013-07-05 13:18:11 -04:00
ff441d2b22 Fix header, quiet flags 2013-07-05 12:26:39 -04:00
b765576283 Improve serial read speeds by using a larger nonblocking read on POSIX 2013-07-04 23:22:18 -04:00
6c5b85b618 Add configurable read/write buffer size 2013-07-04 22:58:15 -04:00
b029c79e63 Send header to stderr; omit instructions if piped 2013-07-04 22:54:29 -04:00
3839c00406 Switch raw and quiet mode automatically based on TTY
Also add --no-quiet, --no-raw
2013-07-04 22:45:12 -04:00
090959a958 Workaround PySerial bug that causes 100% CPU usage 2013-07-04 20:50:28 -04:00
e1a57cfc35 Cleanup 2013-07-04 18:42:13 -04:00
bd4a7642a9 Significantly speed up writes 2013-07-04 18:33:38 -04:00
2 changed files with 177 additions and 61 deletions

13
TODO.md Normal file
View File

@ -0,0 +1,13 @@
- serial.write() may block for reasons beyond our control. For
example, CDC-ACM devices may do flow control internally and block on
USB transfers; the CDC-ACM interface does not expose flow control
settings to the PC, and writes will hang indefinitely even if
--flow is not used. Since polling the keyboard is in the write loop,
this kills us.
- Support some other escape character so ^C can be optionally used.
Maybe even remove ^C as an option, use ^] exclusively?
- Support reconnection
- See patches from "Guyzmo"

View File

@ -21,17 +21,22 @@ import threading
import traceback
import time
import signal
import fcntl
import string
import re
# Need OS-specific method for getting keyboard input.
if os.name == 'nt':
import msvcrt
class Console:
def __init__(self):
pass
def __init__(self, bufsize = 1):
# Buffer size > 1 not supported on Windows
self.tty = True
def cleanup(self):
pass
def getkey(self):
while 1:
start = time.time()
while True:
z = msvcrt.getch()
if z == '\0' or z == '\xe0': # function keys
msvcrt.getch()
@ -39,37 +44,58 @@ if os.name == 'nt':
if z == '\r':
return '\n'
return z
if (time.time() - start) > 0.1:
return None
class MySerial(serial.Serial):
def nonblocking_read(self, size=1):
# Buffer size > 1 not supported on Windows
return self.read(1)
elif os.name == 'posix':
import termios, select
import termios, select, errno
class Console:
def __init__(self):
def __init__(self, bufsize = 65536):
self.bufsize = bufsize
self.fd = sys.stdin.fileno()
try:
if os.isatty(self.fd):
self.tty = True
self.old = termios.tcgetattr(self.fd)
tc = termios.tcgetattr(self.fd)
tc[3] = tc[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
tc[6][termios.VMIN] = 1
tc[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSANOW, tc)
except termios.error:
# ignore errors, so we can pipe stuff to this script
pass
else:
self.tty = False
def cleanup(self):
try:
if self.tty:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
except:
# ignore errors, so we can pipe stuff to this script
pass
def getkey(self):
# Return -1 if we don't get input in 0.1 seconds, so that
# the main code can check the "alive" flag and respond to SIGINT.
[r, w, x] = select.select([self.fd], [], [self.fd], 0.1)
if r:
return os.read(self.fd, 1)
return os.read(self.fd, self.bufsize)
elif x:
return ''
else:
return -1
return None
class MySerial(serial.Serial):
def nonblocking_read(self, size=1):
[r, w, x] = select.select([self.fd], [], [self.fd], self._timeout)
if r:
try:
return os.read(self.fd, size)
except OSError as e:
if e.errno == errno.EAGAIN:
return None
raise
elif x:
raise SerialException("exception (device disconnected?)")
else:
return None # timeout
else:
raise ("Sorry, no terminal implementation for your platform (%s) "
"available." % sys.platform)
@ -104,33 +130,44 @@ class Jimterm:
suppress_read_firstnull = True,
transmit_all = False,
add_cr = False,
raw = False,
color = True):
send_cr = False,
raw = True,
color = True,
bufsize = 65536):
self.color = JimtermColor()
if color:
self.color.setup(len(serials))
self.serials = serials
self.suppress_write_bytes = suppress_write_bytes or ""
self.suppress_write_bytes = suppress_write_bytes
self.suppress_read_firstnull = suppress_read_firstnull
self.last_color = ""
self.threads = []
self.transmit_all = transmit_all
self.add_cr = add_cr
self.send_cr = send_cr
self.raw = raw
self.bufsize = bufsize
self.quote_re = None
self.output_lock = threading.Lock()
def print_header(self, nodes, bauds):
def print_header(self, nodes, bauds, output = sys.stdout):
for (n, (node, baud)) in enumerate(zip(nodes, bauds)):
print (self.color.code(n)
+ node + ", " + str(baud) + " baud"
+ self.color.reset)
print "^C to exit"
print "----------"
output.write(self.color.code(n)
+ node + ", " + str(baud) + " baud"
+ self.color.reset + "\n")
if sys.stdin.isatty():
output.write("^C to exit\n")
output.write("----------\n")
output.flush()
def start(self):
self.alive = True
# Set up console
self.console = Console(self.bufsize)
# serial->console, all devices
for (n, serial) in enumerate(self.serials):
self.threads.append(threading.Thread(
@ -139,7 +176,6 @@ class Jimterm:
))
# console->serial
self.console = Console()
self.threads.append(threading.Thread(target = self.writer))
# start all threads
@ -155,91 +191,128 @@ class Jimterm:
while thread.isAlive():
thread.join(0.1)
def quote_raw(self, data):
if self.quote_re is None:
matcher = '[^%s]' % re.escape(string.printable + "\b")
if sys.version_info < (3,):
self.quote_re = re.compile(matcher)
qf = lambda x: ("\\x%02x" % ord(x.group(0)))
else:
self.quote_re = re.compile(matcher.encode('ascii'))
qf = lambda x: ("\\x%02x" % ord(x.group(0))).encode('ascii')
self.quote_func = qf
return self.quote_re.sub(self.quote_func, data)
def reader(self, serial, color):
"""loop and copy serial->console"""
first = True
try:
if (sys.version_info < (3,)):
null = '\x00'
else:
null = b'\x00'
while self.alive:
data = serial.read(1)
if not data:
data = serial.nonblocking_read(self.bufsize)
if data is None:
continue
if not len(data):
raise Exception("read returned EOF")
# don't print a NULL if it's the first character we
# read. This hides startup/port-opening glitches with
# some serial devices.
if self.suppress_read_firstnull and first and data == '\0':
if self.suppress_read_firstnull and first and data[0] == null:
first = False
continue
data = data[1:]
first = False
self.output_lock.acquire()
if color != self.last_color:
self.last_color = color
sys.stdout.write(color)
os.write(sys.stdout.fileno(), color)
if (self.raw or
(ord(data) >= 32 and ord(data) < 128) or
data == '\r' or data == '\n' or data == '\t'):
if self.add_cr and data == '\n':
sys.stdout.write('\r' + data)
if self.add_cr:
if sys.version_info < (3,):
data = data.replace('\n', '\r\n')
else:
sys.stdout.write(data)
else:
sys.stdout.write('\\x'+("0"+hex(ord(data))[2:])[-2:])
data = data.replace(b'\n', b'\r\n')
if not self.raw:
data = self.quote_raw(data)
os.write(sys.stdout.fileno(), data)
self.output_lock.release()
sys.stdout.flush()
except Exception as e:
self.console.cleanup()
sys.stdout.write(color)
sys.stdout.flush()
traceback.print_exc()
sys.stdout.write(self.color.reset)
sys.stdout.flush()
self.console.cleanup()
os._exit(1)
def writer(self):
"""loop and copy console->serial until ^C"""
try:
if (sys.version_info < (3,)):
ctrlc = '\x03'
else:
ctrlc = b'\x03'
while self.alive:
try:
c = self.console.getkey()
except KeyboardInterrupt:
c = '\x03'
if c == '\x03':
self.stop()
return
elif c == -1:
# No input, try again
if c is None:
# No input, try again.
continue
elif self.console.tty and ctrlc in c:
# Try to catch ^C that didn't trigger KeyboardInterrupt
self.stop()
return
elif c == '':
# EOF on input. Wait a tiny bit so we can
# Probably EOF on input. Wait a tiny bit so we can
# flush the remaining input, then stop.
time.sleep(0.25)
self.stop()
return
elif c in self.suppress_write_bytes:
# Don't send these bytes
continue
else:
# send character
# Remove bytes we don't want to send
if self.suppress_write_bytes is not None:
c = c.translate(None, self.suppress_write_bytes)
if self.send_cr and c == '\n':
c = '\r'
# Send character
if self.transmit_all:
for serial in self.serials:
serial.write(c)
else:
self.serials[0].write(c)
except Exception as e:
self.console.cleanup()
sys.stdout.write(self.color.reset)
sys.stdout.flush()
traceback.print_exc()
self.console.cleanup()
os._exit(1)
def run(self):
# Set all serial port timeouts to 0.1 sec
saved_timeouts = []
for (n, serial) in enumerate(self.serials):
for serial in self.serials:
saved_timeouts.append(serial.timeout)
serial.timeout = 0.1
# Work around https://sourceforge.net/p/pyserial/bugs/151/
saved_writeTimeouts = []
for serial in self.serials:
saved_writeTimeouts.append(serial.writeTimeout)
serial.writeTimeout = 1000000
# Handle SIGINT gracefully
signal.signal(signal.SIGINT, lambda *args: self.stop())
@ -248,11 +321,13 @@ class Jimterm:
self.join()
# Restore serial port timeouts
for (n, serial) in enumerate(self.serials):
serial.timeout = saved_timeouts[n]
for (serial, saved) in zip(self.serials, saved_timeouts):
serial.timeout = saved
for (serial, saved) in zip(self.serials, saved_writeTimeouts):
serial.writeTimeout = saved
# Cleanup
print self.color.reset # and a newline
sys.stdout.write(self.color.reset + "\n")
self.console.cleanup()
if __name__ == "__main__":
@ -272,23 +347,42 @@ if __name__ == "__main__":
"per-device baudrates.")
parser.add_argument("--quiet", "-q", action="store_true",
help="Less verbose output (omit header)")
help="Don't print header")
parser.add_argument("--baudrate", "-b", metavar="BAUD", type=int,
help="Default baudrate for all devices", default=115200)
parser.add_argument("--crlf", "-c", action="store_true",
help="Add CR before incoming LF")
parser.add_argument("--lfcr", "-C", action="store_true",
help="Send CR instead of LF on output")
parser.add_argument("--all", "-a", action="store_true",
help="Send keystrokes to all devices, not just "
"the first one")
parser.add_argument("--mono", "-m", action="store_true",
help="Don't use colors in output")
parser.add_argument("--raw", "-r", action="store_true",
help="Don't escape unprintable characters")
parser.add_argument("--flow", "-f", action="store_true",
help="Enable RTS/CTS flow control")
parser.add_argument("--esp", "-e", action="store_true",
help="Force RTS and DTR high, for ESP boards. Note that"
" the lines may still glitch low at startup.")
parser.add_argument("--bufsize", "-z", metavar="SIZE", type=int,
help="Buffer size for reads and writes", default=65536)
group = parser.add_mutually_exclusive_group(required = False)
group.add_argument("--raw", "-r", action="store_true",
default=argparse.SUPPRESS,
help="Output characters directly "
"(default, if stdout is not a tty)")
group.add_argument("--no-raw", "-R", action="store_true",
default=argparse.SUPPRESS,
help="Quote unprintable characters "
"(default, if stdout is a tty)")
args = parser.parse_args()
piped = not sys.stdout.isatty()
raw = "raw" in args or (piped and "no_raw" not in args)
devs = []
nodes = []
bauds = []
@ -304,7 +398,13 @@ if __name__ == "__main__":
sys.stderr.write("error: %s specified more than once\n" % node)
raise SystemExit(1)
try:
dev = serial.Serial(node, baud, rtscts = args.flow)
dev = MySerial(None, baud, rtscts = args.flow)
dev.port = node
if args.esp:
# Force DTR and RTS high by setting to false
dev.dtr = False
dev.rts = False
dev.open()
except serial.serialutil.SerialException:
sys.stderr.write("error opening %s\n" % node)
raise SystemExit(1)
@ -315,8 +415,11 @@ if __name__ == "__main__":
term = Jimterm(devs,
transmit_all = args.all,
add_cr = args.crlf,
raw = args.raw,
color = (os.name == "posix" and not args.mono))
send_cr = args.lfcr,
raw = raw,
color = (os.name == "posix" and not args.mono),
bufsize = args.bufsize)
if not args.quiet:
term.print_header(nodes, bauds)
term.print_header(nodes, bauds, sys.stderr)
term.run()