Compare commits
3 Commits
ed0f7f47f9
...
master
Author | SHA1 | Date | |
---|---|---|---|
fa5647aaab | |||
b9562c04e0 | |||
b3399e6134 |
@@ -6,12 +6,34 @@
|
||||
import sys
|
||||
import serial
|
||||
|
||||
def printf(str, *args):
|
||||
print(str % args, end='')
|
||||
|
||||
color_lookup = { "red": 31, "green": 32, "cyan": 36, "yellow": 33 }
|
||||
def color(name, text):
|
||||
return "\033[%dm%s\033[0m" % (color_lookup[name], text);
|
||||
return b"\033[%dm%s\033[0m" % (color_lookup[name], text)
|
||||
|
||||
stdout = None
|
||||
|
||||
def printf(str, *args):
|
||||
stdout.write(str.encode('utf-8') % args)
|
||||
|
||||
def printf_color(name, str, *args):
|
||||
stdout.write(color(name, str.encode('utf-8') % args))
|
||||
|
||||
def sprintf_color(name, str, *args):
|
||||
return color(name, str.encode('utf-8') % args)
|
||||
|
||||
try:
|
||||
# On Unix systems, print an additional marker in the output stream
|
||||
# when SIGUSR1 or SIGUSR2 is received. This is just to help clarify
|
||||
# output when the chip is externally reset, etc.
|
||||
import signal
|
||||
def sigusr1_handler(signum, frame):
|
||||
printf_color("yellow", "--- mark ---\n")
|
||||
def sigusr2_handler(signum, frame):
|
||||
printf_color("yellow", "--- reset ---\n")
|
||||
signal.signal(signal.SIGUSR1, sigusr1_handler)
|
||||
signal.signal(signal.SIGUSR2, sigusr2_handler)
|
||||
except AttributeError as e:
|
||||
pass
|
||||
|
||||
class ResyncException(Exception):
|
||||
pass
|
||||
@@ -65,12 +87,13 @@ class ITMParser:
|
||||
try:
|
||||
text = self.parse(c)
|
||||
if text:
|
||||
print(text)
|
||||
printf("%s\n", text)
|
||||
except TimeoutException as e:
|
||||
# Timeout inside a parser should be reported.
|
||||
print(color("red", "Timeout"))
|
||||
printf_color("red", "Timeout\n")
|
||||
break
|
||||
except ResyncException as e:
|
||||
print(color("red", "Resync"))
|
||||
printf_color("red", "Resync\n")
|
||||
|
||||
def next(self):
|
||||
return next(self.synced_stream)
|
||||
@@ -85,7 +108,7 @@ class ITMParser:
|
||||
return None # part of sync packet
|
||||
|
||||
if c == 0x70:
|
||||
return color("yellow", "overflow")
|
||||
return sprintf_color("yellow", "overflow")
|
||||
|
||||
if c & 0x0f == 0x00 and c & 0x70 != 0x00:
|
||||
return self.parse_timestamp(c)
|
||||
@@ -94,7 +117,7 @@ class ITMParser:
|
||||
return self.parse_extension(c)
|
||||
|
||||
if c & 0x0f == 0x04:
|
||||
return color("yellow", "reserved %02x" % c)
|
||||
return sprintf_color("yellow", "reserved %02x", c)
|
||||
|
||||
if c & 0x04 == 0x00 and c & 0x03 != 0:
|
||||
return self.parse_sw(c)
|
||||
@@ -102,7 +125,7 @@ class ITMParser:
|
||||
if c & 0x04 == 0x04 and c & 0x03 != 0:
|
||||
return self.parse_hw(c)
|
||||
|
||||
return color("red", "unknown %02x" % c)
|
||||
return sprintf_color("red", "unknown %02x", c)
|
||||
|
||||
def parse_sw(self, c):
|
||||
"""
|
||||
@@ -114,29 +137,29 @@ class ITMParser:
|
||||
for i in range(length):
|
||||
payload |= self.next() << (i * 8)
|
||||
if port == 0 and length == 1:
|
||||
# Dump directly to stdout
|
||||
print(chr(payload), end='')
|
||||
# Dump directly to stdout as binary data
|
||||
stdout.write(bytes([payload]))
|
||||
return None
|
||||
msg = "SWIT port %d payload %0*x" % (port, 2 * length, payload)
|
||||
return color('cyan', msg)
|
||||
return sprintf_color('cyan', "SWIT port %d payload %0*x",
|
||||
port, 2 * length, payload)
|
||||
|
||||
def parse_hw(self, c):
|
||||
"""
|
||||
Parse HWIT packet
|
||||
"""
|
||||
return color("red", "TODO hw %02x" % c)
|
||||
return sprintf_color("red", "TODO hw %02x", c)
|
||||
|
||||
def parse_timestamp(self, c):
|
||||
"""
|
||||
Parse timestamp packet
|
||||
"""
|
||||
return color("red", "TODO timestamp %02x" % c)
|
||||
return sprintf_color("red", "TODO timestamp %02x", c)
|
||||
|
||||
def parse_extension(self, c):
|
||||
"""
|
||||
Parse extension packet
|
||||
"""
|
||||
return color("red", "TODO extension %02x" % c)
|
||||
return sprintf_color("red", "TODO extension %02x", c)
|
||||
|
||||
def main(argv):
|
||||
import argparse
|
||||
@@ -152,16 +175,40 @@ def main(argv):
|
||||
help="Serial port for SWO input")
|
||||
args = parser.parse_args()
|
||||
|
||||
# TODO: add option to backslash-escape output
|
||||
# TODO: add option to disable color in output
|
||||
# TODO: add option to control output buffering
|
||||
import os
|
||||
global stdout
|
||||
stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
|
||||
|
||||
ser = serial.Serial(args.device, args.baudrate)
|
||||
ser.timeout = 1
|
||||
|
||||
printf_color('green', 'ready\n')
|
||||
|
||||
def input_stream():
|
||||
while True:
|
||||
# Read one byte with a 1s timeout.
|
||||
ser.timeout = 1
|
||||
data = ser.read(1)
|
||||
if len(data) == 0:
|
||||
# Timeout
|
||||
yield None
|
||||
else:
|
||||
yield data[0]
|
||||
continue
|
||||
yield data[0]
|
||||
|
||||
# Then read as many more as there are immediately
|
||||
# available, and send them. This is more efficient than
|
||||
# reading each individual byte, when they're coming in
|
||||
# fast. Once they stop, we'll go back to the normal
|
||||
# 1 byte read with timeout.
|
||||
ser.timeout = 0
|
||||
while True:
|
||||
data = ser.read(65536)
|
||||
if len(data) == 0:
|
||||
break
|
||||
for c in data:
|
||||
yield c
|
||||
|
||||
try:
|
||||
ITMParser(input_stream()).process()
|
||||
|
Reference in New Issue
Block a user