Works with multiple devices now.

Plug in a lot of devices and try: "./terminal.py /dev/ttyUSB*"
This commit is contained in:
Jim Paris 2012-11-14 18:35:05 -05:00
parent 5fbcca0675
commit 2065c9ad12

View File

@ -34,7 +34,7 @@ class Color(object):
self.reset = colorama.Style.RESET_ALL
def code(self, n):
return self.codes[n % self.total]
color = Color()
g_color = Color()
if os.name == 'nt':
import msvcrt
@ -90,19 +90,30 @@ else:
class Miniterm:
"""Normal interactive terminal"""
def __init__(self, serial, suppress_bytes = None):
self.serial = serial
def __init__(self, serials, suppress_bytes = None):
self.serials = serials
self.suppress_bytes = suppress_bytes or ""
self.last_color = ""
self.threads = []
def start(self):
self.alive = True
# serial->console
self.threads.append(threading.Thread(target=self.reader))
# console->serial
# serial->console, all devices
for (n, serial) in enumerate(self.serials):
self.threads.append(threading.Thread(
target = self.reader,
args = (serial, g_color.code(n))
))
# console->serial, just the first device
self.console = Console()
self.threads.append(threading.Thread(target=self.writer))
# start them
self.threads.append(threading.Thread(
target = self.writer,
args = (self.serials[0],)
))
# start all threads
for thread in self.threads:
thread.daemon = True
thread.start()
@ -115,13 +126,16 @@ class Miniterm:
while thread.isAlive():
thread.join(0.1)
def reader(self):
def reader(self, serial, color):
"""loop and copy serial->console"""
try:
while self.alive:
data = self.serial.read(1)
data = serial.read(1)
if not data:
continue
if color != self.last_color:
self.last_color = color
sys.stdout.write(color)
if ((ord(data) >= 32 and ord(data) < 128) or
data == '\r' or data == '\n' or data == '\t'):
sys.stdout.write(data)
@ -133,7 +147,7 @@ class Miniterm:
self.console.cleanup()
os._exit(1)
def writer(self):
def writer(self, serial):
"""loop and copy console->serial until ^C"""
try:
while self.alive:
@ -157,20 +171,32 @@ class Miniterm:
# Don't send these bytes
continue
else:
self.serial.write(c) # send character
serial.write(c) # send character
except Exception as e:
traceback.print_exc()
self.console.cleanup()
os._exit(1)
def run(self):
saved_timeout = self.serial.timeout
self.serial.timeout = 0.1
# Set all serial port timeouts to 0.1 sec
saved_timeouts = []
for (n, serial) in enumerate(self.serials):
saved_timeouts.append(serial.timeout)
serial.timeout = 0.1
# Handle SIGINT gracefully
signal.signal(signal.SIGINT, lambda *args: self.stop())
# Go
self.start()
self.join()
# Restore serial port timeouts
for (n, serial) in enumerate(self.serials):
serial.timeout = saved_timeouts[n]
# Cleanup
print ""
self.serial.timeout = saved_timeout
self.console.cleanup()
if __name__ == "__main__":
@ -195,7 +221,8 @@ if __name__ == "__main__":
args = parser.parse_args()
devs = []
color.setup(len(args.device))
used_nodes = []
g_color.setup(len(args.device))
for (n, device) in enumerate(args.device):
m = re.search(r"^(.*)@([1-9][0-9]*)$", device)
if m is not None:
@ -204,18 +231,24 @@ if __name__ == "__main__":
else:
node = device
baud = args.baudrate
if node in used_nodes:
sys.stderr.write("error: %s already open!\n" % node)
raise SystemExit(1)
try:
dev = serial.Serial(node, baud)
except serial.serialutil.SerialException:
sys.stderr.write("error opening %s\n" % node)
raise SystemExit(1)
if not args.quiet:
print color.code(n) + node + ", " + str(args.baudrate) + " baud"
print (g_color.code(n)
+ node + ", " + str(args.baudrate) + " baud"
+ g_color.reset)
used_nodes.append(node)
devs.append(dev)
if not args.quiet:
print color.reset + "^C to exit"
print color.reset + "----------"
term = Miniterm(dev)
print "^C to exit"
print "----------"
term = Miniterm(devs)
term.run()