You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

323 lines
11 KiB

  1. #!/usr/bin/python
  2. # Jim Paris <jim@jtan.com>
  3. # Simple terminal program for serial devices. Supports setting
  4. # baudrates and simple LF->CRLF mapping on input, and basic
  5. # flow control, but nothing fancy.
  6. # ^C quits. There is no escaping, so you can't currently send this
  7. # character to the remote host. Piping input or output should work.
  8. # Supports multiple serial devices simultaneously. When using more
  9. # than one, each device's output is in a different color. Input
  10. # is directed to the first device, or can be sent to all devices
  11. # with --all.
  12. import sys
  13. import os
  14. import serial
  15. import threading
  16. import traceback
  17. import time
  18. import signal
  19. # Need OS-specific method for getting keyboard input.
  20. if os.name == 'nt':
  21. import msvcrt
  22. class Console:
  23. def __init__(self):
  24. pass
  25. def cleanup(self):
  26. pass
  27. def getkey(self):
  28. while 1:
  29. z = msvcrt.getch()
  30. if z == '\0' or z == '\xe0': # function keys
  31. msvcrt.getch()
  32. else:
  33. if z == '\r':
  34. return '\n'
  35. return z
  36. elif os.name == 'posix':
  37. import termios, select
  38. class Console:
  39. def __init__(self):
  40. self.fd = sys.stdin.fileno()
  41. try:
  42. self.old = termios.tcgetattr(self.fd)
  43. tc = termios.tcgetattr(self.fd)
  44. tc[3] = tc[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
  45. tc[6][termios.VMIN] = 1
  46. tc[6][termios.VTIME] = 0
  47. termios.tcsetattr(self.fd, termios.TCSANOW, tc)
  48. except termios.error:
  49. # ignore errors, so we can pipe stuff to this script
  50. pass
  51. def cleanup(self):
  52. try:
  53. termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
  54. except:
  55. # ignore errors, so we can pipe stuff to this script
  56. pass
  57. def getkey(self):
  58. # Return -1 if we don't get input in 0.1 seconds, so that
  59. # the main code can check the "alive" flag and respond to SIGINT.
  60. [r, w, x] = select.select([self.fd], [], [self.fd], 0.1)
  61. if r:
  62. return os.read(self.fd, 1)
  63. elif x:
  64. return ''
  65. else:
  66. return -1
  67. else:
  68. raise ("Sorry, no terminal implementation for your platform (%s) "
  69. "available." % sys.platform)
  70. class JimtermColor(object):
  71. def __init__(self):
  72. self.setup(1)
  73. def setup(self, total):
  74. if total > 1:
  75. self.codes = [
  76. "\x1b[1;36m", # cyan
  77. "\x1b[1;33m", # yellow
  78. "\x1b[1;35m", # magenta
  79. "\x1b[1;31m", # red
  80. "\x1b[1;32m", # green
  81. "\x1b[1;34m", # blue
  82. "\x1b[1;37m", # white
  83. ]
  84. self.reset = "\x1b[0m"
  85. else:
  86. self.codes = [""]
  87. self.reset = ""
  88. def code(self, n):
  89. return self.codes[n % len(self.codes)]
  90. class Jimterm:
  91. """Normal interactive terminal"""
  92. def __init__(self,
  93. serials,
  94. suppress_write_bytes = None,
  95. suppress_read_firstnull = True,
  96. transmit_all = False,
  97. add_cr = False,
  98. raw = False,
  99. color = True):
  100. self.color = JimtermColor()
  101. if color:
  102. self.color.setup(len(serials))
  103. self.serials = serials
  104. self.suppress_write_bytes = suppress_write_bytes or ""
  105. self.suppress_read_firstnull = suppress_read_firstnull
  106. self.last_color = ""
  107. self.threads = []
  108. self.transmit_all = transmit_all
  109. self.add_cr = add_cr
  110. self.raw = raw
  111. def print_header(self, nodes, bauds):
  112. for (n, (node, baud)) in enumerate(zip(nodes, bauds)):
  113. print (self.color.code(n)
  114. + node + ", " + str(baud) + " baud"
  115. + self.color.reset)
  116. print "^C to exit"
  117. print "----------"
  118. def start(self):
  119. self.alive = True
  120. # serial->console, all devices
  121. for (n, serial) in enumerate(self.serials):
  122. self.threads.append(threading.Thread(
  123. target = self.reader,
  124. args = (serial, self.color.code(n))
  125. ))
  126. # console->serial
  127. self.console = Console()
  128. self.threads.append(threading.Thread(target = self.writer))
  129. # start all threads
  130. for thread in self.threads:
  131. thread.daemon = True
  132. thread.start()
  133. def stop(self):
  134. self.alive = False
  135. def join(self):
  136. for thread in self.threads:
  137. while thread.isAlive():
  138. thread.join(0.1)
  139. def reader(self, serial, color):
  140. """loop and copy serial->console"""
  141. first = True
  142. try:
  143. while self.alive:
  144. data = serial.read(1)
  145. if not data:
  146. continue
  147. # don't print a NULL if it's the first character we
  148. # read. This hides startup/port-opening glitches with
  149. # some serial devices.
  150. if self.suppress_read_firstnull and first and data == '\0':
  151. first = False
  152. continue
  153. first = False
  154. if color != self.last_color:
  155. self.last_color = color
  156. sys.stdout.write(color)
  157. if (self.raw or
  158. (ord(data) >= 32 and ord(data) < 128) or
  159. data == '\r' or data == '\n' or data == '\t'):
  160. if self.add_cr and data == '\n':
  161. sys.stdout.write('\r' + data)
  162. else:
  163. sys.stdout.write(data)
  164. else:
  165. sys.stdout.write('\\x'+("0"+hex(ord(data))[2:])[-2:])
  166. sys.stdout.flush()
  167. except Exception as e:
  168. sys.stdout.write(color)
  169. sys.stdout.flush()
  170. traceback.print_exc()
  171. sys.stdout.write(self.color.reset)
  172. sys.stdout.flush()
  173. self.console.cleanup()
  174. os._exit(1)
  175. def writer(self):
  176. """loop and copy console->serial until ^C"""
  177. try:
  178. while self.alive:
  179. try:
  180. c = self.console.getkey()
  181. except KeyboardInterrupt:
  182. c = '\x03'
  183. if c == '\x03':
  184. self.stop()
  185. return
  186. elif c == -1:
  187. # No input, try again
  188. continue
  189. elif c == '':
  190. # EOF on input. Wait a tiny bit so we can
  191. # flush the remaining input, then stop.
  192. time.sleep(0.25)
  193. self.stop()
  194. return
  195. elif c in self.suppress_write_bytes:
  196. # Don't send these bytes
  197. continue
  198. else:
  199. # send character
  200. if self.transmit_all:
  201. for serial in self.serials:
  202. serial.write(c)
  203. else:
  204. self.serials[0].write(c)
  205. except Exception as e:
  206. sys.stdout.write(self.color.reset)
  207. sys.stdout.flush()
  208. traceback.print_exc()
  209. self.console.cleanup()
  210. os._exit(1)
  211. def run(self):
  212. # Set all serial port timeouts to 0.1 sec
  213. saved_timeouts = []
  214. for (n, serial) in enumerate(self.serials):
  215. saved_timeouts.append(serial.timeout)
  216. serial.timeout = 0.1
  217. # Handle SIGINT gracefully
  218. signal.signal(signal.SIGINT, lambda *args: self.stop())
  219. # Go
  220. self.start()
  221. self.join()
  222. # Restore serial port timeouts
  223. for (n, serial) in enumerate(self.serials):
  224. serial.timeout = saved_timeouts[n]
  225. # Cleanup
  226. print self.color.reset # and a newline
  227. self.console.cleanup()
  228. if __name__ == "__main__":
  229. import argparse
  230. import re
  231. formatter = argparse.ArgumentDefaultsHelpFormatter
  232. description = ("Simple serial terminal that supports multiple devices. "
  233. "If more than one device is specified, device output is "
  234. "shown in varying colors. All input goes to the "
  235. "first device.")
  236. parser = argparse.ArgumentParser(description = description,
  237. formatter_class = formatter)
  238. parser.add_argument("device", metavar="DEVICE", nargs="+",
  239. help="Serial device. Specify DEVICE@BAUD for "
  240. "per-device baudrates.")
  241. parser.add_argument("--quiet", "-q", action="store_true",
  242. help="Less verbose output (omit header)")
  243. parser.add_argument("--baudrate", "-b", metavar="BAUD", type=int,
  244. help="Default baudrate for all devices", default=115200)
  245. parser.add_argument("--crlf", "-c", action="store_true",
  246. help="Add CR before incoming LF")
  247. parser.add_argument("--all", "-a", action="store_true",
  248. help="Send keystrokes to all devices, not just "
  249. "the first one")
  250. parser.add_argument("--mono", "-m", action="store_true",
  251. help="Don't use colors in output")
  252. parser.add_argument("--raw", "-r", action="store_true",
  253. help="Don't escape unprintable characters")
  254. parser.add_argument("--flow", "-f", action="store_true",
  255. help="Enable RTS/CTS flow control")
  256. args = parser.parse_args()
  257. devs = []
  258. nodes = []
  259. bauds = []
  260. for (n, device) in enumerate(args.device):
  261. m = re.search(r"^(.*)@([1-9][0-9]*)$", device)
  262. if m is not None:
  263. node = m.group(1)
  264. baud = m.group(2)
  265. else:
  266. node = device
  267. baud = args.baudrate
  268. if node in nodes:
  269. sys.stderr.write("error: %s specified more than once\n" % node)
  270. raise SystemExit(1)
  271. try:
  272. dev = serial.Serial(node, baud, rtscts = args.flow)
  273. except serial.serialutil.SerialException:
  274. sys.stderr.write("error opening %s\n" % node)
  275. raise SystemExit(1)
  276. nodes.append(node)
  277. bauds.append(baud)
  278. devs.append(dev)
  279. term = Jimterm(devs,
  280. transmit_all = args.all,
  281. add_cr = args.crlf,
  282. raw = args.raw,
  283. color = (os.name == "posix" and not args.mono))
  284. if not args.quiet:
  285. term.print_header(nodes, bauds)
  286. term.run()