Compare commits

...

3 Commits

Author SHA1 Message Date
fa5647aaab Reopen stdout as an unbuffered binary stream
We want to dump output directly from the device without any further
processing, to support binary data dumps; we also disable buffering
on stdout, to support piping output to other processes more cleanly.
2020-06-10 18:08:29 -04:00
b9562c04e0 Add ability to add output marker via external signal
For example, a script to flash the chip can use this to mark the
output so that it's clear when the target was actually reset.
2020-05-19 10:23:15 -04:00
b3399e6134 Improve throughput by reading data in larger chunks
Once a first byte is received, read as many as are immediately
available in larger chunks before going back to single-byte reads.
This prevents dropped data when the host is sending it quickly.
2020-05-19 10:21:51 -04:00

View File

@@ -6,12 +6,34 @@
import sys
import serial
def printf(str, *args):
print(str % args, end='')
color_lookup = { "red": 31, "green": 32, "cyan": 36, "yellow": 33 }
def color(name, text):
return "\033[%dm%s\033[0m" % (color_lookup[name], text);
return b"\033[%dm%s\033[0m" % (color_lookup[name], text)
stdout = None
def printf(str, *args):
stdout.write(str.encode('utf-8') % args)
def printf_color(name, str, *args):
stdout.write(color(name, str.encode('utf-8') % args))
def sprintf_color(name, str, *args):
return color(name, str.encode('utf-8') % args)
try:
# On Unix systems, print an additional marker in the output stream
# when SIGUSR1 or SIGUSR2 is received. This is just to help clarify
# output when the chip is externally reset, etc.
import signal
def sigusr1_handler(signum, frame):
printf_color("yellow", "--- mark ---\n")
def sigusr2_handler(signum, frame):
printf_color("yellow", "--- reset ---\n")
signal.signal(signal.SIGUSR1, sigusr1_handler)
signal.signal(signal.SIGUSR2, sigusr2_handler)
except AttributeError as e:
pass
class ResyncException(Exception):
pass
@@ -65,12 +87,13 @@ class ITMParser:
try:
text = self.parse(c)
if text:
print(text)
printf("%s\n", text)
except TimeoutException as e:
# Timeout inside a parser should be reported.
print(color("red", "Timeout"))
printf_color("red", "Timeout\n")
break
except ResyncException as e:
print(color("red", "Resync"))
printf_color("red", "Resync\n")
def next(self):
return next(self.synced_stream)
@@ -85,7 +108,7 @@ class ITMParser:
return None # part of sync packet
if c == 0x70:
return color("yellow", "overflow")
return sprintf_color("yellow", "overflow")
if c & 0x0f == 0x00 and c & 0x70 != 0x00:
return self.parse_timestamp(c)
@@ -94,7 +117,7 @@ class ITMParser:
return self.parse_extension(c)
if c & 0x0f == 0x04:
return color("yellow", "reserved %02x" % c)
return sprintf_color("yellow", "reserved %02x", c)
if c & 0x04 == 0x00 and c & 0x03 != 0:
return self.parse_sw(c)
@@ -102,7 +125,7 @@ class ITMParser:
if c & 0x04 == 0x04 and c & 0x03 != 0:
return self.parse_hw(c)
return color("red", "unknown %02x" % c)
return sprintf_color("red", "unknown %02x", c)
def parse_sw(self, c):
"""
@@ -114,29 +137,29 @@ class ITMParser:
for i in range(length):
payload |= self.next() << (i * 8)
if port == 0 and length == 1:
# Dump directly to stdout
print(chr(payload), end='')
# Dump directly to stdout as binary data
stdout.write(bytes([payload]))
return None
msg = "SWIT port %d payload %0*x" % (port, 2 * length, payload)
return color('cyan', msg)
return sprintf_color('cyan', "SWIT port %d payload %0*x",
port, 2 * length, payload)
def parse_hw(self, c):
"""
Parse HWIT packet
"""
return color("red", "TODO hw %02x" % c)
return sprintf_color("red", "TODO hw %02x", c)
def parse_timestamp(self, c):
"""
Parse timestamp packet
"""
return color("red", "TODO timestamp %02x" % c)
return sprintf_color("red", "TODO timestamp %02x", c)
def parse_extension(self, c):
"""
Parse extension packet
"""
return color("red", "TODO extension %02x" % c)
return sprintf_color("red", "TODO extension %02x", c)
def main(argv):
import argparse
@@ -152,16 +175,40 @@ def main(argv):
help="Serial port for SWO input")
args = parser.parse_args()
# TODO: add option to backslash-escape output
# TODO: add option to disable color in output
# TODO: add option to control output buffering
import os
global stdout
stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
ser = serial.Serial(args.device, args.baudrate)
ser.timeout = 1
printf_color('green', 'ready\n')
def input_stream():
while True:
# Read one byte with a 1s timeout.
ser.timeout = 1
data = ser.read(1)
if len(data) == 0:
# Timeout
yield None
else:
yield data[0]
continue
yield data[0]
# Then read as many more as there are immediately
# available, and send them. This is more efficient than
# reading each individual byte, when they're coming in
# fast. Once they stop, we'll go back to the normal
# 1 byte read with timeout.
ser.timeout = 0
while True:
data = ser.read(65536)
if len(data) == 0:
break
for c in data:
yield c
try:
ITMParser(input_stream()).process()