itm-decode/itm-decode.py
Jim Paris fa5647aaab Reopen stdout as an unbuffered binary stream
We want to dump output directly from the device without any further
processing, to support binary data dumps; we also disable buffering
on stdout, to support piping output to other processes more cleanly.
2020-06-10 18:08:29 -04:00

220 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python3
# Open a serial port and decode ARM ITM trace (SWO) data sent with the
# UART encoding.
import sys
import serial
color_lookup = { "red": 31, "green": 32, "cyan": 36, "yellow": 33 }
def color(name, text):
return b"\033[%dm%s\033[0m" % (color_lookup[name], text)
stdout = None
def printf(str, *args):
stdout.write(str.encode('utf-8') % args)
def printf_color(name, str, *args):
stdout.write(color(name, str.encode('utf-8') % args))
def sprintf_color(name, str, *args):
return color(name, str.encode('utf-8') % args)
try:
# On Unix systems, print an additional marker in the output stream
# when SIGUSR1 or SIGUSR2 is received. This is just to help clarify
# output when the chip is externally reset, etc.
import signal
def sigusr1_handler(signum, frame):
printf_color("yellow", "--- mark ---\n")
def sigusr2_handler(signum, frame):
printf_color("yellow", "--- reset ---\n")
signal.signal(signal.SIGUSR1, sigusr1_handler)
signal.signal(signal.SIGUSR2, sigusr2_handler)
except AttributeError as e:
pass
class ResyncException(Exception):
pass
class TimeoutException(Exception):
pass
class ITMParser:
def __init__(self, stream):
self.input_stream = stream
def read_loop(self):
"""
Yields bytes from the input stream one at a time. Raises an
ResyncException if the bytes were a resync sequence, or
TimeoutException if there was an input timeout.
"""
sync_count = 0
while True:
data = next(self.input_stream)
if data is None:
raise TimeoutException()
# Resync
if data == 0x00:
sync_count += 1
elif sync_count >= 7 and data == 0x80:
raise ResyncException()
else:
sync_count = 0
yield data
def process(self):
"""
Main processing loop; restarts the parser any time we get
a resync sequence.
"""
while True:
try:
self.synced_stream = self.read_loop()
while True:
try:
c = self.next()
except TimeoutException as e:
# Timeout for the first byte can be ignored.
break
try:
text = self.parse(c)
if text:
printf("%s\n", text)
except TimeoutException as e:
# Timeout inside a parser should be reported.
printf_color("red", "Timeout\n")
break
except ResyncException as e:
printf_color("red", "Resync\n")
def next(self):
return next(self.synced_stream)
def parse(self, c):
"""
Process top-level ITM packets, returning a string that should be
printed to describe it.
"""
if c & 0x7f == 0:
return None # part of sync packet
if c == 0x70:
return sprintf_color("yellow", "overflow")
if c & 0x0f == 0x00 and c & 0x70 != 0x00:
return self.parse_timestamp(c)
if c & 0x0b == 0x80:
return self.parse_extension(c)
if c & 0x0f == 0x04:
return sprintf_color("yellow", "reserved %02x", c)
if c & 0x04 == 0x00 and c & 0x03 != 0:
return self.parse_sw(c)
if c & 0x04 == 0x04 and c & 0x03 != 0:
return self.parse_hw(c)
return sprintf_color("red", "unknown %02x", c)
def parse_sw(self, c):
"""
Parse SWIT packet
"""
port = (c & 0x81) >> 3
length = 1 << ((c & 0x3) - 1)
payload = 0
for i in range(length):
payload |= self.next() << (i * 8)
if port == 0 and length == 1:
# Dump directly to stdout as binary data
stdout.write(bytes([payload]))
return None
return sprintf_color('cyan', "SWIT port %d payload %0*x",
port, 2 * length, payload)
def parse_hw(self, c):
"""
Parse HWIT packet
"""
return sprintf_color("red", "TODO hw %02x", c)
def parse_timestamp(self, c):
"""
Parse timestamp packet
"""
return sprintf_color("red", "TODO timestamp %02x", c)
def parse_extension(self, c):
"""
Parse extension packet
"""
return sprintf_color("red", "TODO extension %02x", c)
def main(argv):
import argparse
parser = argparse.ArgumentParser(
prog = argv[0],
description = "Decode ARM ITM data sent via SWO with UART encoding",
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--baudrate", "-b", metavar="BAUD", default=4000000,
help="SWO data baudrate")
parser.add_argument("device", metavar="PORT",
help="Serial port for SWO input")
args = parser.parse_args()
# TODO: add option to backslash-escape output
# TODO: add option to disable color in output
# TODO: add option to control output buffering
import os
global stdout
stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
ser = serial.Serial(args.device, args.baudrate)
printf_color('green', 'ready\n')
def input_stream():
while True:
# Read one byte with a 1s timeout.
ser.timeout = 1
data = ser.read(1)
if len(data) == 0:
# Timeout
yield None
continue
yield data[0]
# Then read as many more as there are immediately
# available, and send them. This is more efficient than
# reading each individual byte, when they're coming in
# fast. Once they stop, we'll go back to the normal
# 1 byte read with timeout.
ser.timeout = 0
while True:
data = ser.read(65536)
if len(data) == 0:
break
for c in data:
yield c
try:
ITMParser(input_stream()).process()
except KeyboardInterrupt:
raise SystemExit(0)
if __name__ == "__main__":
main(sys.argv)