You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

417 lines
14 KiB

  1. #!/usr/bin/python
  2. # Jim Paris <jim@jtan.com>
  3. # Simple terminal program for serial devices. Supports setting
  4. # baudrates and simple LF->CRLF mapping on input, and basic
  5. # flow control, but nothing fancy.
  6. # ^C quits. There is no escaping, so you can't currently send this
  7. # character to the remote host. Piping input or output should work.
  8. # Supports multiple serial devices simultaneously. When using more
  9. # than one, each device's output is in a different color. Input
  10. # is directed to the first device, or can be sent to all devices
  11. # with --all.
  12. import sys
  13. import os
  14. import serial
  15. import threading
  16. import traceback
  17. import time
  18. import signal
  19. import fcntl
  20. import string
  21. import re
  22. # Need OS-specific method for getting keyboard input.
  23. if os.name == 'nt':
  24. import msvcrt
  25. class Console:
  26. def __init__(self, bufsize = 1):
  27. # Buffer size > 1 not supported on Windows
  28. self.tty = True
  29. def cleanup(self):
  30. pass
  31. def getkey(self):
  32. start = time.time()
  33. while True:
  34. z = msvcrt.getch()
  35. if z == '\0' or z == '\xe0': # function keys
  36. msvcrt.getch()
  37. else:
  38. if z == '\r':
  39. return '\n'
  40. return z
  41. if (time.time() - start) > 0.1:
  42. return None
  43. class MySerial(serial.Serial):
  44. def nonblocking_read(self, size=1):
  45. # Buffer size > 1 not supported on Windows
  46. return self.read(1)
  47. elif os.name == 'posix':
  48. import termios, select, errno
  49. class Console:
  50. def __init__(self, bufsize = 65536):
  51. self.bufsize = bufsize
  52. self.fd = sys.stdin.fileno()
  53. if os.isatty(self.fd):
  54. self.tty = True
  55. self.old = termios.tcgetattr(self.fd)
  56. tc = termios.tcgetattr(self.fd)
  57. tc[3] = tc[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
  58. tc[6][termios.VMIN] = 1
  59. tc[6][termios.VTIME] = 0
  60. termios.tcsetattr(self.fd, termios.TCSANOW, tc)
  61. else:
  62. self.tty = False
  63. def cleanup(self):
  64. if self.tty:
  65. termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
  66. def getkey(self):
  67. # Return -1 if we don't get input in 0.1 seconds, so that
  68. # the main code can check the "alive" flag and respond to SIGINT.
  69. [r, w, x] = select.select([self.fd], [], [self.fd], 0.1)
  70. if r:
  71. return os.read(self.fd, self.bufsize)
  72. elif x:
  73. return ''
  74. else:
  75. return None
  76. class MySerial(serial.Serial):
  77. def nonblocking_read(self, size=1):
  78. [r, w, x] = select.select([self.fd], [], [self.fd], self._timeout)
  79. if r:
  80. try:
  81. return os.read(self.fd, size)
  82. except OSError as e:
  83. if e.errno == errno.EAGAIN:
  84. return None
  85. raise
  86. elif x:
  87. raise SerialException("exception (device disconnected?)")
  88. else:
  89. return None # timeout
  90. else:
  91. raise ("Sorry, no terminal implementation for your platform (%s) "
  92. "available." % sys.platform)
  93. class JimtermColor(object):
  94. def __init__(self):
  95. self.setup(1)
  96. def setup(self, total):
  97. if total > 1:
  98. self.codes = [
  99. "\x1b[1;36m", # cyan
  100. "\x1b[1;33m", # yellow
  101. "\x1b[1;35m", # magenta
  102. "\x1b[1;31m", # red
  103. "\x1b[1;32m", # green
  104. "\x1b[1;34m", # blue
  105. "\x1b[1;37m", # white
  106. ]
  107. self.reset = "\x1b[0m"
  108. else:
  109. self.codes = [""]
  110. self.reset = ""
  111. def code(self, n):
  112. return self.codes[n % len(self.codes)]
  113. class Jimterm:
  114. """Normal interactive terminal"""
  115. def __init__(self,
  116. serials,
  117. suppress_write_bytes = None,
  118. suppress_read_firstnull = True,
  119. transmit_all = False,
  120. add_cr = False,
  121. send_cr = False,
  122. raw = True,
  123. color = True,
  124. bufsize = 65536):
  125. self.color = JimtermColor()
  126. if color:
  127. self.color.setup(len(serials))
  128. self.serials = serials
  129. self.suppress_write_bytes = suppress_write_bytes
  130. self.suppress_read_firstnull = suppress_read_firstnull
  131. self.last_color = ""
  132. self.threads = []
  133. self.transmit_all = transmit_all
  134. self.add_cr = add_cr
  135. self.send_cr = send_cr
  136. self.raw = raw
  137. self.bufsize = bufsize
  138. self.quote_re = None
  139. self.output_lock = threading.Lock()
  140. def print_header(self, nodes, bauds, output = sys.stdout):
  141. for (n, (node, baud)) in enumerate(zip(nodes, bauds)):
  142. output.write(self.color.code(n)
  143. + node + ", " + str(baud) + " baud"
  144. + self.color.reset + "\n")
  145. if sys.stdin.isatty():
  146. output.write("^C to exit\n")
  147. output.write("----------\n")
  148. output.flush()
  149. def start(self):
  150. self.alive = True
  151. # Set up console
  152. self.console = Console(self.bufsize)
  153. # serial->console, all devices
  154. for (n, serial) in enumerate(self.serials):
  155. self.threads.append(threading.Thread(
  156. target = self.reader,
  157. args = (serial, self.color.code(n))
  158. ))
  159. # console->serial
  160. self.threads.append(threading.Thread(target = self.writer))
  161. # start all threads
  162. for thread in self.threads:
  163. thread.daemon = True
  164. thread.start()
  165. def stop(self):
  166. self.alive = False
  167. def join(self):
  168. for thread in self.threads:
  169. while thread.isAlive():
  170. thread.join(0.1)
  171. def quote_raw(self, data):
  172. if self.quote_re is None:
  173. matcher = '[^%s]' % re.escape(string.printable + "\b")
  174. if sys.version_info < (3,):
  175. self.quote_re = re.compile(matcher)
  176. qf = lambda x: ("\\x%02x" % ord(x.group(0)))
  177. else:
  178. self.quote_re = re.compile(matcher.encode('ascii'))
  179. qf = lambda x: ("\\x%02x" % ord(x.group(0))).encode('ascii')
  180. self.quote_func = qf
  181. return self.quote_re.sub(self.quote_func, data)
  182. def reader(self, serial, color):
  183. """loop and copy serial->console"""
  184. first = True
  185. try:
  186. if (sys.version_info < (3,)):
  187. null = '\x00'
  188. else:
  189. null = b'\x00'
  190. while self.alive:
  191. data = serial.nonblocking_read(self.bufsize)
  192. if data is None:
  193. continue
  194. if not len(data):
  195. raise Exception("read returned EOF")
  196. # don't print a NULL if it's the first character we
  197. # read. This hides startup/port-opening glitches with
  198. # some serial devices.
  199. if self.suppress_read_firstnull and first and data[0] == null:
  200. first = False
  201. data = data[1:]
  202. first = False
  203. self.output_lock.acquire()
  204. if color != self.last_color:
  205. self.last_color = color
  206. os.write(sys.stdout.fileno(), color)
  207. if self.add_cr:
  208. if sys.version_info < (3,):
  209. data = data.replace('\n', '\r\n')
  210. else:
  211. data = data.replace(b'\n', b'\r\n')
  212. if not self.raw:
  213. data = self.quote_raw(data)
  214. os.write(sys.stdout.fileno(), data)
  215. self.output_lock.release()
  216. except Exception as e:
  217. self.console.cleanup()
  218. sys.stdout.write(color)
  219. sys.stdout.flush()
  220. traceback.print_exc()
  221. sys.stdout.write(self.color.reset)
  222. sys.stdout.flush()
  223. os._exit(1)
  224. def writer(self):
  225. """loop and copy console->serial until ^C"""
  226. try:
  227. if (sys.version_info < (3,)):
  228. ctrlc = '\x03'
  229. else:
  230. ctrlc = b'\x03'
  231. while self.alive:
  232. try:
  233. c = self.console.getkey()
  234. except KeyboardInterrupt:
  235. self.stop()
  236. return
  237. if c is None:
  238. # No input, try again.
  239. continue
  240. elif self.console.tty and ctrlc in c:
  241. # Try to catch ^C that didn't trigger KeyboardInterrupt
  242. self.stop()
  243. return
  244. elif c == '':
  245. # Probably EOF on input. Wait a tiny bit so we can
  246. # flush the remaining input, then stop.
  247. time.sleep(0.25)
  248. self.stop()
  249. return
  250. else:
  251. # Remove bytes we don't want to send
  252. if self.suppress_write_bytes is not None:
  253. c = c.translate(None, self.suppress_write_bytes)
  254. if self.send_cr and c == '\n':
  255. c = '\r'
  256. # Send character
  257. if self.transmit_all:
  258. for serial in self.serials:
  259. serial.write(c)
  260. else:
  261. self.serials[0].write(c)
  262. except Exception as e:
  263. self.console.cleanup()
  264. sys.stdout.write(self.color.reset)
  265. sys.stdout.flush()
  266. traceback.print_exc()
  267. os._exit(1)
  268. def run(self):
  269. # Set all serial port timeouts to 0.1 sec
  270. saved_timeouts = []
  271. for serial in self.serials:
  272. saved_timeouts.append(serial.timeout)
  273. serial.timeout = 0.1
  274. # Work around https://sourceforge.net/p/pyserial/bugs/151/
  275. saved_writeTimeouts = []
  276. for serial in self.serials:
  277. saved_writeTimeouts.append(serial.writeTimeout)
  278. serial.writeTimeout = 1000000
  279. # Handle SIGINT gracefully
  280. signal.signal(signal.SIGINT, lambda *args: self.stop())
  281. # Go
  282. self.start()
  283. self.join()
  284. # Restore serial port timeouts
  285. for (serial, saved) in zip(self.serials, saved_timeouts):
  286. serial.timeout = saved
  287. for (serial, saved) in zip(self.serials, saved_writeTimeouts):
  288. serial.writeTimeout = saved
  289. # Cleanup
  290. sys.stdout.write(self.color.reset + "\n")
  291. self.console.cleanup()
  292. if __name__ == "__main__":
  293. import argparse
  294. import re
  295. formatter = argparse.ArgumentDefaultsHelpFormatter
  296. description = ("Simple serial terminal that supports multiple devices. "
  297. "If more than one device is specified, device output is "
  298. "shown in varying colors. All input goes to the "
  299. "first device.")
  300. parser = argparse.ArgumentParser(description = description,
  301. formatter_class = formatter)
  302. parser.add_argument("device", metavar="DEVICE", nargs="+",
  303. help="Serial device. Specify DEVICE@BAUD for "
  304. "per-device baudrates.")
  305. parser.add_argument("--quiet", "-q", action="store_true",
  306. help="Don't print header")
  307. parser.add_argument("--baudrate", "-b", metavar="BAUD", type=int,
  308. help="Default baudrate for all devices", default=115200)
  309. parser.add_argument("--crlf", "-c", action="store_true",
  310. help="Add CR before incoming LF")
  311. parser.add_argument("--lfcr", "-C", action="store_true",
  312. help="Send CR instead of LF on output")
  313. parser.add_argument("--all", "-a", action="store_true",
  314. help="Send keystrokes to all devices, not just "
  315. "the first one")
  316. parser.add_argument("--mono", "-m", action="store_true",
  317. help="Don't use colors in output")
  318. parser.add_argument("--flow", "-f", action="store_true",
  319. help="Enable RTS/CTS flow control")
  320. parser.add_argument("--bufsize", "-z", metavar="SIZE", type=int,
  321. help="Buffer size for reads and writes", default=65536)
  322. group = parser.add_mutually_exclusive_group(required = False)
  323. group.add_argument("--raw", "-r", action="store_true",
  324. default=argparse.SUPPRESS,
  325. help="Output characters directly "
  326. "(default, if stdout is not a tty)")
  327. group.add_argument("--no-raw", "-R", action="store_true",
  328. default=argparse.SUPPRESS,
  329. help="Quote unprintable characters "
  330. "(default, if stdout is a tty)")
  331. args = parser.parse_args()
  332. piped = not sys.stdout.isatty()
  333. raw = "raw" in args or (piped and "no_raw" not in args)
  334. devs = []
  335. nodes = []
  336. bauds = []
  337. for (n, device) in enumerate(args.device):
  338. m = re.search(r"^(.*)@([1-9][0-9]*)$", device)
  339. if m is not None:
  340. node = m.group(1)
  341. baud = m.group(2)
  342. else:
  343. node = device
  344. baud = args.baudrate
  345. if node in nodes:
  346. sys.stderr.write("error: %s specified more than once\n" % node)
  347. raise SystemExit(1)
  348. try:
  349. dev = MySerial(node, baud, rtscts = args.flow)
  350. except serial.serialutil.SerialException:
  351. sys.stderr.write("error opening %s\n" % node)
  352. raise SystemExit(1)
  353. nodes.append(node)
  354. bauds.append(baud)
  355. devs.append(dev)
  356. term = Jimterm(devs,
  357. transmit_all = args.all,
  358. add_cr = args.crlf,
  359. send_cr = args.lfcr,
  360. raw = raw,
  361. color = (os.name == "posix" and not args.mono),
  362. bufsize = args.bufsize)
  363. if not args.quiet:
  364. term.print_header(nodes, bauds, sys.stderr)
  365. term.run()