@@ -0,0 +1,6 @@ | |||
from .initialise import init, deinit, reinit | |||
from .ansi import Fore, Back, Style | |||
from .ansitowin32 import AnsiToWin32 | |||
VERSION = '0.2.4' | |||
@@ -0,0 +1,49 @@ | |||
''' | |||
This module generates ANSI character codes to printing colors to terminals. | |||
See: http://en.wikipedia.org/wiki/ANSI_escape_code | |||
''' | |||
CSI = '\033[' | |||
def code_to_chars(code): | |||
return CSI + str(code) + 'm' | |||
class AnsiCodes(object): | |||
def __init__(self, codes): | |||
for name in dir(codes): | |||
if not name.startswith('_'): | |||
value = getattr(codes, name) | |||
setattr(self, name, code_to_chars(value)) | |||
class AnsiFore: | |||
BLACK = 30 | |||
RED = 31 | |||
GREEN = 32 | |||
YELLOW = 33 | |||
BLUE = 34 | |||
MAGENTA = 35 | |||
CYAN = 36 | |||
WHITE = 37 | |||
RESET = 39 | |||
class AnsiBack: | |||
BLACK = 40 | |||
RED = 41 | |||
GREEN = 42 | |||
YELLOW = 43 | |||
BLUE = 44 | |||
MAGENTA = 45 | |||
CYAN = 46 | |||
WHITE = 47 | |||
RESET = 49 | |||
class AnsiStyle: | |||
BRIGHT = 1 | |||
DIM = 2 | |||
NORMAL = 22 | |||
RESET_ALL = 0 | |||
Fore = AnsiCodes( AnsiFore ) | |||
Back = AnsiCodes( AnsiBack ) | |||
Style = AnsiCodes( AnsiStyle ) | |||
@@ -0,0 +1,182 @@ | |||
import re | |||
import sys | |||
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style | |||
from .winterm import WinTerm, WinColor, WinStyle | |||
from .win32 import windll | |||
if windll is not None: | |||
winterm = WinTerm() | |||
def is_a_tty(stream): | |||
return hasattr(stream, 'isatty') and stream.isatty() | |||
class StreamWrapper(object): | |||
''' | |||
Wraps a stream (such as stdout), acting as a transparent proxy for all | |||
attribute access apart from method 'write()', which is delegated to our | |||
Converter instance. | |||
''' | |||
def __init__(self, wrapped, converter): | |||
# double-underscore everything to prevent clashes with names of | |||
# attributes on the wrapped stream object. | |||
self.__wrapped = wrapped | |||
self.__convertor = converter | |||
def __getattr__(self, name): | |||
return getattr(self.__wrapped, name) | |||
def write(self, text): | |||
self.__convertor.write(text) | |||
class AnsiToWin32(object): | |||
''' | |||
Implements a 'write()' method which, on Windows, will strip ANSI character | |||
sequences from the text, and if outputting to a tty, will convert them into | |||
win32 function calls. | |||
''' | |||
ANSI_RE = re.compile('\033\[((?:\d|;)*)([a-zA-Z])') | |||
def __init__(self, wrapped, convert=None, strip=None, autoreset=False): | |||
# The wrapped stream (normally sys.stdout or sys.stderr) | |||
self.wrapped = wrapped | |||
# should we reset colors to defaults after every .write() | |||
self.autoreset = autoreset | |||
# create the proxy wrapping our output stream | |||
self.stream = StreamWrapper(wrapped, self) | |||
on_windows = sys.platform.startswith('win') | |||
# should we strip ANSI sequences from our output? | |||
if strip is None: | |||
strip = on_windows | |||
self.strip = strip | |||
# should we should convert ANSI sequences into win32 calls? | |||
if convert is None: | |||
convert = on_windows and is_a_tty(wrapped) | |||
self.convert = convert | |||
# dict of ansi codes to win32 functions and parameters | |||
self.win32_calls = self.get_win32_calls() | |||
# are we wrapping stderr? | |||
self.on_stderr = self.wrapped is sys.stderr | |||
def should_wrap(self): | |||
''' | |||
True if this class is actually needed. If false, then the output | |||
stream will not be affected, nor will win32 calls be issued, so | |||
wrapping stdout is not actually required. This will generally be | |||
False on non-Windows platforms, unless optional functionality like | |||
autoreset has been requested using kwargs to init() | |||
''' | |||
return self.convert or self.strip or self.autoreset | |||
def get_win32_calls(self): | |||
if self.convert and winterm: | |||
return { | |||
AnsiStyle.RESET_ALL: (winterm.reset_all, ), | |||
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT), | |||
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL), | |||
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL), | |||
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK), | |||
AnsiFore.RED: (winterm.fore, WinColor.RED), | |||
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN), | |||
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW), | |||
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE), | |||
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA), | |||
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN), | |||
AnsiFore.WHITE: (winterm.fore, WinColor.GREY), | |||
AnsiFore.RESET: (winterm.fore, ), | |||
AnsiBack.BLACK: (winterm.back, WinColor.BLACK), | |||
AnsiBack.RED: (winterm.back, WinColor.RED), | |||
AnsiBack.GREEN: (winterm.back, WinColor.GREEN), | |||
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW), | |||
AnsiBack.BLUE: (winterm.back, WinColor.BLUE), | |||
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA), | |||
AnsiBack.CYAN: (winterm.back, WinColor.CYAN), | |||
AnsiBack.WHITE: (winterm.back, WinColor.GREY), | |||
AnsiBack.RESET: (winterm.back, ), | |||
} | |||
def write(self, text): | |||
if self.strip or self.convert: | |||
self.write_and_convert(text) | |||
else: | |||
self.wrapped.write(text) | |||
self.wrapped.flush() | |||
if self.autoreset: | |||
self.reset_all() | |||
def reset_all(self): | |||
if self.convert: | |||
self.call_win32('m', (0,)) | |||
elif is_a_tty(self.wrapped): | |||
self.wrapped.write(Style.RESET_ALL) | |||
def write_and_convert(self, text): | |||
''' | |||
Write the given text to our wrapped stream, stripping any ANSI | |||
sequences from the text, and optionally converting them into win32 | |||
calls. | |||
''' | |||
cursor = 0 | |||
for match in self.ANSI_RE.finditer(text): | |||
start, end = match.span() | |||
self.write_plain_text(text, cursor, start) | |||
self.convert_ansi(*match.groups()) | |||
cursor = end | |||
self.write_plain_text(text, cursor, len(text)) | |||
def write_plain_text(self, text, start, end): | |||
if start < end: | |||
self.wrapped.write(text[start:end]) | |||
self.wrapped.flush() | |||
def convert_ansi(self, paramstring, command): | |||
if self.convert: | |||
params = self.extract_params(paramstring) | |||
self.call_win32(command, params) | |||
def extract_params(self, paramstring): | |||
def split(paramstring): | |||
for p in paramstring.split(';'): | |||
if p != '': | |||
yield int(p) | |||
return tuple(split(paramstring)) | |||
def call_win32(self, command, params): | |||
if params == []: | |||
params = [0] | |||
if command == 'm': | |||
for param in params: | |||
if param in self.win32_calls: | |||
func_args = self.win32_calls[param] | |||
func = func_args[0] | |||
args = func_args[1:] | |||
kwargs = dict(on_stderr=self.on_stderr) | |||
func(*args, **kwargs) | |||
elif command in ('H', 'f'): # set cursor position | |||
func = winterm.set_cursor_position | |||
func(params, on_stderr=self.on_stderr) | |||
elif command in ('J'): | |||
func = winterm.erase_data | |||
func(params, on_stderr=self.on_stderr) | |||
@@ -0,0 +1,55 @@ | |||
import atexit | |||
import sys | |||
from .ansitowin32 import AnsiToWin32 | |||
orig_stdout = sys.stdout | |||
orig_stderr = sys.stderr | |||
wrapped_stdout = sys.stdout | |||
wrapped_stderr = sys.stderr | |||
atexit_done = False | |||
def reset_all(): | |||
AnsiToWin32(orig_stdout).reset_all() | |||
def init(autoreset=False, convert=None, strip=None, wrap=True): | |||
if not wrap and any([autoreset, convert, strip]): | |||
raise ValueError('wrap=False conflicts with any other arg=True') | |||
global wrapped_stdout, wrapped_stderr | |||
sys.stdout = wrapped_stdout = \ | |||
wrap_stream(orig_stdout, convert, strip, autoreset, wrap) | |||
sys.stderr = wrapped_stderr = \ | |||
wrap_stream(orig_stderr, convert, strip, autoreset, wrap) | |||
global atexit_done | |||
if not atexit_done: | |||
atexit.register(reset_all) | |||
atexit_done = True | |||
def deinit(): | |||
sys.stdout = orig_stdout | |||
sys.stderr = orig_stderr | |||
def reinit(): | |||
sys.stdout = wrapped_stdout | |||
sys.stderr = wrapped_stdout | |||
def wrap_stream(stream, convert, strip, autoreset, wrap): | |||
if wrap: | |||
wrapper = AnsiToWin32(stream, | |||
convert=convert, strip=strip, autoreset=autoreset) | |||
if wrapper.should_wrap(): | |||
stream = wrapper.stream | |||
return stream | |||
@@ -0,0 +1,109 @@ | |||
# from winbase.h | |||
STDOUT = -11 | |||
STDERR = -12 | |||
try: | |||
from ctypes import windll | |||
except ImportError: | |||
windll = None | |||
SetConsoleTextAttribute = lambda *_: None | |||
else: | |||
from ctypes import ( | |||
byref, Structure, c_char, c_short, c_uint32, c_ushort | |||
) | |||
handles = { | |||
STDOUT: windll.kernel32.GetStdHandle(STDOUT), | |||
STDERR: windll.kernel32.GetStdHandle(STDERR), | |||
} | |||
SHORT = c_short | |||
WORD = c_ushort | |||
DWORD = c_uint32 | |||
TCHAR = c_char | |||
class COORD(Structure): | |||
"""struct in wincon.h""" | |||
_fields_ = [ | |||
('X', SHORT), | |||
('Y', SHORT), | |||
] | |||
class SMALL_RECT(Structure): | |||
"""struct in wincon.h.""" | |||
_fields_ = [ | |||
("Left", SHORT), | |||
("Top", SHORT), | |||
("Right", SHORT), | |||
("Bottom", SHORT), | |||
] | |||
class CONSOLE_SCREEN_BUFFER_INFO(Structure): | |||
"""struct in wincon.h.""" | |||
_fields_ = [ | |||
("dwSize", COORD), | |||
("dwCursorPosition", COORD), | |||
("wAttributes", WORD), | |||
("srWindow", SMALL_RECT), | |||
("dwMaximumWindowSize", COORD), | |||
] | |||
def __str__(self): | |||
return '(%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d)' % ( | |||
self.dwSize.Y, self.dwSize.X | |||
, self.dwCursorPosition.Y, self.dwCursorPosition.X | |||
, self.wAttributes | |||
, self.srWindow.Top, self.srWindow.Left, self.srWindow.Bottom, self.srWindow.Right | |||
, self.dwMaximumWindowSize.Y, self.dwMaximumWindowSize.X | |||
) | |||
def GetConsoleScreenBufferInfo(stream_id=STDOUT): | |||
handle = handles[stream_id] | |||
csbi = CONSOLE_SCREEN_BUFFER_INFO() | |||
success = windll.kernel32.GetConsoleScreenBufferInfo( | |||
handle, byref(csbi)) | |||
return csbi | |||
def SetConsoleTextAttribute(stream_id, attrs): | |||
handle = handles[stream_id] | |||
return windll.kernel32.SetConsoleTextAttribute(handle, attrs) | |||
def SetConsoleCursorPosition(stream_id, position): | |||
position = COORD(*position) | |||
# If the position is out of range, do nothing. | |||
if position.Y <= 0 or position.X <= 0: | |||
return | |||
# Adjust for Windows' SetConsoleCursorPosition: | |||
# 1. being 0-based, while ANSI is 1-based. | |||
# 2. expecting (x,y), while ANSI uses (y,x). | |||
adjusted_position = COORD(position.Y - 1, position.X - 1) | |||
# Adjust for viewport's scroll position | |||
sr = GetConsoleScreenBufferInfo(STDOUT).srWindow | |||
adjusted_position.Y += sr.Top | |||
adjusted_position.X += sr.Left | |||
# Resume normal processing | |||
handle = handles[stream_id] | |||
return windll.kernel32.SetConsoleCursorPosition(handle, adjusted_position) | |||
def FillConsoleOutputCharacter(stream_id, char, length, start): | |||
handle = handles[stream_id] | |||
char = TCHAR(char) | |||
length = DWORD(length) | |||
num_written = DWORD(0) | |||
# Note that this is hard-coded for ANSI (vs wide) bytes. | |||
success = windll.kernel32.FillConsoleOutputCharacterA( | |||
handle, char, length, start, byref(num_written)) | |||
return num_written.value | |||
def FillConsoleOutputAttribute(stream_id, attr, length, start): | |||
''' FillConsoleOutputAttribute( hConsole, csbi.wAttributes, dwConSize, coordScreen, &cCharsWritten )''' | |||
handle = handles[stream_id] | |||
attribute = WORD(attr) | |||
length = DWORD(length) | |||
num_written = DWORD(0) | |||
# Note that this is hard-coded for ANSI (vs wide) bytes. | |||
return windll.kernel32.FillConsoleOutputAttribute( | |||
handle, attribute, length, start, byref(num_written)) | |||
@@ -0,0 +1,102 @@ | |||
from . import win32 | |||
# from wincon.h | |||
class WinColor(object): | |||
BLACK = 0 | |||
BLUE = 1 | |||
GREEN = 2 | |||
CYAN = 3 | |||
RED = 4 | |||
MAGENTA = 5 | |||
YELLOW = 6 | |||
GREY = 7 | |||
# from wincon.h | |||
class WinStyle(object): | |||
NORMAL = 0x00 # dim text, dim background | |||
BRIGHT = 0x08 # bright text, dim background | |||
class WinTerm(object): | |||
def __init__(self): | |||
self._default = win32.GetConsoleScreenBufferInfo(win32.STDOUT).wAttributes | |||
self.set_attrs(self._default) | |||
self._default_fore = self._fore | |||
self._default_back = self._back | |||
self._default_style = self._style | |||
def get_attrs(self): | |||
return self._fore + self._back * 16 + self._style | |||
def set_attrs(self, value): | |||
self._fore = value & 7 | |||
self._back = (value >> 4) & 7 | |||
self._style = value & WinStyle.BRIGHT | |||
def reset_all(self, on_stderr=None): | |||
self.set_attrs(self._default) | |||
self.set_console(attrs=self._default) | |||
def fore(self, fore=None, on_stderr=False): | |||
if fore is None: | |||
fore = self._default_fore | |||
self._fore = fore | |||
self.set_console(on_stderr=on_stderr) | |||
def back(self, back=None, on_stderr=False): | |||
if back is None: | |||
back = self._default_back | |||
self._back = back | |||
self.set_console(on_stderr=on_stderr) | |||
def style(self, style=None, on_stderr=False): | |||
if style is None: | |||
style = self._default_style | |||
self._style = style | |||
self.set_console(on_stderr=on_stderr) | |||
def set_console(self, attrs=None, on_stderr=False): | |||
if attrs is None: | |||
attrs = self.get_attrs() | |||
handle = win32.STDOUT | |||
if on_stderr: | |||
handle = win32.STDERR | |||
win32.SetConsoleTextAttribute(handle, attrs) | |||
def set_cursor_position(self, position=None, on_stderr=False): | |||
if position is None: | |||
#I'm not currently tracking the position, so there is no default. | |||
#position = self.get_position() | |||
return | |||
handle = win32.STDOUT | |||
if on_stderr: | |||
handle = win32.STDERR | |||
win32.SetConsoleCursorPosition(handle, position) | |||
def erase_data(self, mode=0, on_stderr=False): | |||
# 0 (or None) should clear from the cursor to the end of the screen. | |||
# 1 should clear from the cursor to the beginning of the screen. | |||
# 2 should clear the entire screen. (And maybe move cursor to (1,1)?) | |||
# | |||
# At the moment, I only support mode 2. From looking at the API, it | |||
# should be possible to calculate a different number of bytes to clear, | |||
# and to do so relative to the cursor position. | |||
if mode[0] not in (2,): | |||
return | |||
handle = win32.STDOUT | |||
if on_stderr: | |||
handle = win32.STDERR | |||
# here's where we'll home the cursor | |||
coord_screen = win32.COORD(0,0) | |||
csbi = win32.GetConsoleScreenBufferInfo(handle) | |||
# get the number of character cells in the current buffer | |||
dw_con_size = csbi.dwSize.X * csbi.dwSize.Y | |||
# fill the entire screen with blanks | |||
win32.FillConsoleOutputCharacter(handle, ord(' '), dw_con_size, coord_screen) | |||
# now set the buffer's attributes accordingly | |||
win32.FillConsoleOutputAttribute(handle, self.get_attrs(), dw_con_size, coord_screen ); | |||
# put the cursor at (0, 0) | |||
win32.SetConsoleCursorPosition(handle, (coord_screen.X, coord_screen.Y)) |
@@ -1,7 +1,15 @@ | |||
#!/usr/bin/python | |||
# Based on miniterm sample in pyserial | |||
import sys, os, serial, threading, traceback, time | |||
import sys | |||
import os | |||
import serial | |||
import threading | |||
import traceback | |||
import time | |||
import colorama | |||
colorama.init() | |||
if os.name == 'nt': | |||
import msvcrt | |||
@@ -48,33 +56,38 @@ else: | |||
"available." % sys.platform) | |||
class Miniterm: | |||
"""Normal interactive terminal""" | |||
def __init__(self, serial): | |||
self.serial = serial | |||
self.threads = [] | |||
def start(self): | |||
self.alive = True | |||
#start serial->console thread | |||
self.receiver_thread = threading.Thread(target=self.reader) | |||
self.receiver_thread.daemon = True | |||
self.receiver_thread.start() | |||
#enter console->serial loop | |||
# serial->console | |||
self.threads.append(threading.Thread(target=self.reader)) | |||
# console->serial | |||
self.console = Console() | |||
self.transmitter_thread = threading.Thread(target=self.writer) | |||
self.transmitter_thread.daemon = True | |||
self.transmitter_thread.start() | |||
self.threads.append(threading.Thread(target=self.writer)) | |||
# start them | |||
for thread in self.threads: | |||
thread.daemon = True | |||
thread.start() | |||
def stop(self): | |||
self.alive = False | |||
def join(self, transmit_only=False): | |||
self.receiver_thread.join() | |||
self.transmitter_thread.join() | |||
def join(self): | |||
for thread in self.threads: | |||
thread.join() | |||
def reader(self): | |||
"""loop and copy serial->console""" | |||
try: | |||
while self.alive: | |||
data = self.serial.read(1) | |||
if not data: | |||
continue | |||
sys.stdout.write(data) | |||
sys.stdout.flush() | |||
except Exception as e: | |||
@@ -118,17 +131,27 @@ class Miniterm: | |||
if __name__ == "__main__": | |||
import argparse | |||
parser = argparse.ArgumentParser(description="Simple serial terminal") | |||
parser = argparse.ArgumentParser(description="Simple serial terminal. " | |||
"If two devices are provided, output from " | |||
"the first is shown in yellow, second in " | |||
"cyan, and input is discarded.") | |||
parser.add_argument('device', metavar='DEVICE', | |||
help='serial device') | |||
help='serial device 1') | |||
parser.add_argument('baudrate', metavar='BAUDRATE', type=int, nargs='?', | |||
help='baud rate', default=115200) | |||
help='baud rate 1', default=115200) | |||
parser.add_argument('device2', metavar="DEVICE2", nargs='?', | |||
help='serial device 2', default=None) | |||
parser.add_argument('baudrate2', metavar='BAUDRATE2', type=int, nargs='?', | |||
help='baud rate 2', default=115200) | |||
args = parser.parse_args() | |||
try: | |||
dev = serial.Serial(args.device, args.baudrate) | |||
except serial.serialutil.SerialException: | |||
sys.stderr.write("error opening %s\n" % args.device) | |||
raise SystemExit(1) | |||
print args.device + ", " + str(args.baudrate) + " baud" | |||
print "^C to exit" | |||
print "--" | |||
term = Miniterm(dev) | |||
term.run() | |||