Browse Source

Reopen stdout as an unbuffered binary stream

We want to dump output directly from the device without any further
processing, to support binary data dumps; we also disable buffering
on stdout, to support piping output to other processes more cleanly.
master
Jim Paris 3 years ago
parent
commit
fa5647aaab
1 changed files with 35 additions and 20 deletions
  1. +35
    -20
      itm-decode.py

+ 35
- 20
itm-decode.py View File

@@ -6,12 +6,20 @@
import sys
import serial

def printf(str, *args):
print(str % args, end='')

color_lookup = { "red": 31, "green": 32, "cyan": 36, "yellow": 33 }
def color(name, text):
return "\033[%dm%s\033[0m" % (color_lookup[name], text);
return b"\033[%dm%s\033[0m" % (color_lookup[name], text)

stdout = None

def printf(str, *args):
stdout.write(str.encode('utf-8') % args)

def printf_color(name, str, *args):
stdout.write(color(name, str.encode('utf-8') % args))

def sprintf_color(name, str, *args):
return color(name, str.encode('utf-8') % args)

try:
# On Unix systems, print an additional marker in the output stream
@@ -19,9 +27,9 @@ try:
# output when the chip is externally reset, etc.
import signal
def sigusr1_handler(signum, frame):
print(color("yellow", "--- mark ---"))
printf_color("yellow", "--- mark ---\n")
def sigusr2_handler(signum, frame):
print(color("yellow", "--- reset ---"))
printf_color("yellow", "--- reset ---\n")
signal.signal(signal.SIGUSR1, sigusr1_handler)
signal.signal(signal.SIGUSR2, sigusr2_handler)
except AttributeError as e:
@@ -79,13 +87,13 @@ class ITMParser:
try:
text = self.parse(c)
if text:
print(text)
printf("%s\n", text)
except TimeoutException as e:
# Timeout inside a parser should be reported.
print(color("red", "Timeout"))
printf_color("red", "Timeout\n")
break
except ResyncException as e:
print(color("red", "Resync"))
printf_color("red", "Resync\n")

def next(self):
return next(self.synced_stream)
@@ -100,7 +108,7 @@ class ITMParser:
return None # part of sync packet

if c == 0x70:
return color("yellow", "overflow")
return sprintf_color("yellow", "overflow")

if c & 0x0f == 0x00 and c & 0x70 != 0x00:
return self.parse_timestamp(c)
@@ -109,7 +117,7 @@ class ITMParser:
return self.parse_extension(c)

if c & 0x0f == 0x04:
return color("yellow", "reserved %02x" % c)
return sprintf_color("yellow", "reserved %02x", c)

if c & 0x04 == 0x00 and c & 0x03 != 0:
return self.parse_sw(c)
@@ -117,7 +125,7 @@ class ITMParser:
if c & 0x04 == 0x04 and c & 0x03 != 0:
return self.parse_hw(c)

return color("red", "unknown %02x" % c)
return sprintf_color("red", "unknown %02x", c)

def parse_sw(self, c):
"""
@@ -129,29 +137,29 @@ class ITMParser:
for i in range(length):
payload |= self.next() << (i * 8)
if port == 0 and length == 1:
# Dump directly to stdout
print(chr(payload), end='')
# Dump directly to stdout as binary data
stdout.write(bytes([payload]))
return None
msg = "SWIT port %d payload %0*x" % (port, 2 * length, payload)
return color('cyan', msg)
return sprintf_color('cyan', "SWIT port %d payload %0*x",
port, 2 * length, payload)

def parse_hw(self, c):
"""
Parse HWIT packet
"""
return color("red", "TODO hw %02x" % c)
return sprintf_color("red", "TODO hw %02x", c)

def parse_timestamp(self, c):
"""
Parse timestamp packet
"""
return color("red", "TODO timestamp %02x" % c)
return sprintf_color("red", "TODO timestamp %02x", c)

def parse_extension(self, c):
"""
Parse extension packet
"""
return color("red", "TODO extension %02x" % c)
return sprintf_color("red", "TODO extension %02x", c)

def main(argv):
import argparse
@@ -167,9 +175,16 @@ def main(argv):
help="Serial port for SWO input")
args = parser.parse_args()

# TODO: add option to backslash-escape output
# TODO: add option to disable color in output
# TODO: add option to control output buffering
import os
global stdout
stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)

ser = serial.Serial(args.device, args.baudrate)

print(color('green', 'ready'))
printf_color('green', 'ready\n')

def input_stream():
while True:


Loading…
Cancel
Save