|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- #!/usr/bin/env python3
-
- # Open a serial port and decode ARM ITM trace (SWO) data sent with the
- # UART encoding.
-
- import sys
- import serial
-
- color_lookup = { "red": 31, "green": 32, "cyan": 36, "yellow": 33 }
- def color(name, text):
- return b"\033[%dm%s\033[0m" % (color_lookup[name], text)
-
- stdout = None
-
- def printf(str, *args):
- stdout.write(str.encode('utf-8') % args)
-
- def printf_color(name, str, *args):
- stdout.write(color(name, str.encode('utf-8') % args))
-
- def sprintf_color(name, str, *args):
- return color(name, str.encode('utf-8') % args)
-
- try:
- # On Unix systems, print an additional marker in the output stream
- # when SIGUSR1 or SIGUSR2 is received. This is just to help clarify
- # output when the chip is externally reset, etc.
- import signal
- def sigusr1_handler(signum, frame):
- printf_color("yellow", "--- mark ---\n")
- def sigusr2_handler(signum, frame):
- printf_color("yellow", "--- reset ---\n")
- signal.signal(signal.SIGUSR1, sigusr1_handler)
- signal.signal(signal.SIGUSR2, sigusr2_handler)
- except AttributeError as e:
- pass
-
- class ResyncException(Exception):
- pass
-
- class TimeoutException(Exception):
- pass
-
- class ITMParser:
-
- def __init__(self, stream):
- self.input_stream = stream
-
- def read_loop(self):
- """
- Yields bytes from the input stream one at a time. Raises an
- ResyncException if the bytes were a resync sequence, or
- TimeoutException if there was an input timeout.
- """
- sync_count = 0
- while True:
- data = next(self.input_stream)
-
- if data is None:
- raise TimeoutException()
-
- # Resync
- if data == 0x00:
- sync_count += 1
- elif sync_count >= 7 and data == 0x80:
- raise ResyncException()
- else:
- sync_count = 0
-
- yield data
-
- def process(self):
- """
- Main processing loop; restarts the parser any time we get
- a resync sequence.
- """
- while True:
- try:
- self.synced_stream = self.read_loop()
- while True:
- try:
- c = self.next()
- except TimeoutException as e:
- # Timeout for the first byte can be ignored.
- break
-
- try:
- text = self.parse(c)
- if text:
- printf("%s\n", text)
- except TimeoutException as e:
- # Timeout inside a parser should be reported.
- printf_color("red", "Timeout\n")
- break
- except ResyncException as e:
- printf_color("red", "Resync\n")
-
- def next(self):
- return next(self.synced_stream)
-
- def parse(self, c):
- """
- Process top-level ITM packets, returning a string that should be
- printed to describe it.
- """
-
- if c & 0x7f == 0:
- return None # part of sync packet
-
- if c == 0x70:
- return sprintf_color("yellow", "overflow")
-
- if c & 0x0f == 0x00 and c & 0x70 != 0x00:
- return self.parse_timestamp(c)
-
- if c & 0x0b == 0x80:
- return self.parse_extension(c)
-
- if c & 0x0f == 0x04:
- return sprintf_color("yellow", "reserved %02x", c)
-
- if c & 0x04 == 0x00 and c & 0x03 != 0:
- return self.parse_sw(c)
-
- if c & 0x04 == 0x04 and c & 0x03 != 0:
- return self.parse_hw(c)
-
- return sprintf_color("red", "unknown %02x", c)
-
- def parse_sw(self, c):
- """
- Parse SWIT packet
- """
- port = (c & 0x81) >> 3
- length = 1 << ((c & 0x3) - 1)
- payload = 0
- for i in range(length):
- payload |= self.next() << (i * 8)
- if port == 0 and length == 1:
- # Dump directly to stdout as binary data
- stdout.write(bytes([payload]))
- return None
- return sprintf_color('cyan', "SWIT port %d payload %0*x",
- port, 2 * length, payload)
-
- def parse_hw(self, c):
- """
- Parse HWIT packet
- """
- return sprintf_color("red", "TODO hw %02x", c)
-
- def parse_timestamp(self, c):
- """
- Parse timestamp packet
- """
- return sprintf_color("red", "TODO timestamp %02x", c)
-
- def parse_extension(self, c):
- """
- Parse extension packet
- """
- return sprintf_color("red", "TODO extension %02x", c)
-
- def main(argv):
- import argparse
-
- parser = argparse.ArgumentParser(
- prog = argv[0],
- description = "Decode ARM ITM data sent via SWO with UART encoding",
- formatter_class = argparse.ArgumentDefaultsHelpFormatter)
-
- parser.add_argument("--baudrate", "-b", metavar="BAUD", default=4000000,
- help="SWO data baudrate")
- parser.add_argument("device", metavar="PORT",
- help="Serial port for SWO input")
- args = parser.parse_args()
-
- # TODO: add option to backslash-escape output
- # TODO: add option to disable color in output
- # TODO: add option to control output buffering
- import os
- global stdout
- stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
-
- ser = serial.Serial(args.device, args.baudrate)
-
- printf_color('green', 'ready\n')
-
- def input_stream():
- while True:
- # Read one byte with a 1s timeout.
- ser.timeout = 1
- data = ser.read(1)
- if len(data) == 0:
- # Timeout
- yield None
- continue
- yield data[0]
-
- # Then read as many more as there are immediately
- # available, and send them. This is more efficient than
- # reading each individual byte, when they're coming in
- # fast. Once they stop, we'll go back to the normal
- # 1 byte read with timeout.
- ser.timeout = 0
- while True:
- data = ser.read(65536)
- if len(data) == 0:
- break
- for c in data:
- yield c
-
- try:
- ITMParser(input_stream()).process()
- except KeyboardInterrupt:
- raise SystemExit(0)
-
- if __name__ == "__main__":
- main(sys.argv)
|