Previous commits went back and forth a bit on whether the various APIs should use bytes or strings, but bytes appears to be a better answer, because actual data in streams will always be 7-bit ASCII or raw binary. There's no reason to apply the performance penalty of constantly converting between bytes and strings. One drawback now is that lots of code now has to have "b" prefixes on strings, especially in tests, which inflates this commit quite a bit.tags/nilmdb-2.0.0
@@ -430,7 +430,7 @@ mod_wsgi requires "WSGIChunkedRequest On" to handle | |||
"Transfer-encoding: Chunked" requests. However, `/stream/insert` | |||
doesn't handle this correctly right now, because: | |||
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests | |||
- The `cherrypy.request.body.read()` call needs to be fixed for chunked requests | |||
- We don't want to just buffer endlessly in the server, and it will | |||
require some thought on how to handle data in chunks (what to do about | |||
@@ -438,3 +438,32 @@ doesn't handle this correctly right now, because: | |||
It is probably better to just keep the endpoint management on the client | |||
side, so leave "WSGIChunkedRequest off" for now. | |||
Unicode & character encoding | |||
---------------------------- | |||
Stream data is passed back and forth as raw `bytes` objects in most | |||
places, including the `nilmdb.client` and command-line interfaces. | |||
This is done partially for performance reasons, and partially to | |||
support the binary insert/extract options, where character-set encoding | |||
would not apply. | |||
For the HTTP server, the raw bytes transferred over HTTP are interpreted | |||
as follows: | |||
- For `/stream/insert`, the client-provided `Content-Type` is ignored, | |||
and the data is read as if it were `application/octet-stream`. | |||
- For `/stream/extract`, the returned data is `application/octet-stream`. | |||
- All other endpoints communicate via JSON, which is specified to always | |||
be encoded as UTF-8. This includes: | |||
- `/version` | |||
- `/dbinfo` | |||
- `/stream/list` | |||
- `/stream/create` | |||
- `/stream/destroy` | |||
- `/stream/rename` | |||
- `/stream/get_metadata` | |||
- `/stream/set_metadata` | |||
- `/stream/update_metadata` | |||
- `/stream/remove` | |||
- `/stream/intervals` |
@@ -181,7 +181,7 @@ class Client(object): | |||
} | |||
if binary: | |||
params["binary"] = 1 | |||
return self.http.put("stream/insert", data, params, binary = binary) | |||
return self.http.put("stream/insert", data, params, binary) | |||
def stream_intervals(self, path, start = None, end = None, diffpath = None): | |||
""" | |||
@@ -370,10 +370,10 @@ class StreamInserter(object): | |||
there isn't one.""" | |||
start = 0 | |||
while True: | |||
end = block.find('\n', start) | |||
end = block.find(b'\n', start) | |||
if end < 0: | |||
raise IndexError | |||
if block[start] != '#': | |||
if block[start] != b'#'[0]: | |||
return (start, (end + 1)) | |||
start = end + 1 | |||
@@ -381,12 +381,12 @@ class StreamInserter(object): | |||
"""Return the (start, end) indices of the last full line in | |||
block[:length] that isn't a comment, or raise IndexError if | |||
there isn't one.""" | |||
end = block.rfind('\n') | |||
end = block.rfind(b'\n') | |||
if end <= 0: | |||
raise IndexError | |||
while True: | |||
start = block.rfind('\n', 0, end) | |||
if block[start + 1] != '#': | |||
start = block.rfind(b'\n', 0, end) | |||
if block[start + 1] != b'#'[0]: | |||
return ((start + 1), end) | |||
if start == -1: | |||
raise IndexError | |||
@@ -396,7 +396,7 @@ class StreamInserter(object): | |||
"""Send data currently in the block. The data sent will | |||
consist of full lines only, so some might be left over.""" | |||
# Build the full string to send | |||
block = "".join(self._block_data) | |||
block = b"".join(self._block_data) | |||
start_ts = self._interval_start | |||
if start_ts is None: | |||
@@ -413,7 +413,7 @@ class StreamInserter(object): | |||
# or the timestamp of the last line plus epsilon. | |||
end_ts = self._interval_end | |||
try: | |||
if block[-1] != '\n': | |||
if block[-1] != b'\n'[0]: | |||
raise ValueError("final block didn't end with a newline") | |||
if end_ts is None: | |||
(spos, epos) = self._get_last_noncomment(block) | |||
@@ -69,9 +69,9 @@ def cmd_extract(self): | |||
printed = False | |||
if self.args.binary: | |||
printer = sys.stdout.write | |||
printer = sys.stdout.buffer.write | |||
else: | |||
printer = print | |||
printer = lambda x: print(x.decode('utf-8')) | |||
bare = self.args.bare | |||
count = self.args.count | |||
for dataline in self.client.stream_extract(self.args.path, | |||
@@ -83,7 +83,7 @@ def cmd_extract(self): | |||
if bare and not count: | |||
# Strip timestamp (first element). Doesn't make sense | |||
# if we are only returning a count. | |||
dataline = ' '.join(dataline.split(' ')[1:]) | |||
dataline = b' '.join(dataline.split(b' ')[1:]) | |||
printer(dataline) | |||
printed = True | |||
if not printed: | |||
@@ -87,7 +87,7 @@ def cmd_insert(self): | |||
try: | |||
filename = arg.file | |||
if filename == '-': | |||
infile = sys.stdin | |||
infile = sys.stdin.buffer | |||
else: | |||
try: | |||
infile = open(filename, "rb") | |||
@@ -104,7 +104,7 @@ def cmd_insert(self): | |||
if arg.timestamp: | |||
data = timestamper.TimestamperRate(infile, arg.start, arg.rate) | |||
else: | |||
data = iter(lambda: infile.read(1048576), '') | |||
data = iter(lambda: infile.read(1048576), b'') | |||
# Print info | |||
if not arg.quiet: | |||
@@ -41,10 +41,10 @@ def cmd_metadata(self): | |||
if self.args.set is not None or self.args.update is not None: | |||
# Either set, or update | |||
if self.args.set is not None: | |||
keyvals = list(map(nilmdb.utils.str.decode, self.args.set)) | |||
keyvals = self.args.set | |||
handler = self.client.stream_set_metadata | |||
else: | |||
keyvals = list(map(nilmdb.utils.str.decode, self.args.update)) | |||
keyvals = self.args.update | |||
handler = self.client.stream_update_metadata | |||
# Extract key=value pairs | |||
@@ -64,7 +64,7 @@ def cmd_metadata(self): | |||
# Delete (by setting values to empty strings) | |||
keys = None | |||
if self.args.delete: | |||
keys = list(map(nilmdb.utils.str.decode, self.args.delete)) | |||
keys = list(self.args.delete) | |||
try: | |||
data = self.client.stream_get_metadata(self.args.path, keys) | |||
for key in data: | |||
@@ -76,7 +76,7 @@ def cmd_metadata(self): | |||
# Get (or unspecified) | |||
keys = None | |||
if self.args.get: | |||
keys = list(map(nilmdb.utils.str.decode, self.args.get)) | |||
keys = list(self.args.get) | |||
try: | |||
data = self.client.stream_get_metadata(self.args.path, keys) | |||
except nilmdb.client.ClientError as e: | |||
@@ -85,6 +85,4 @@ def cmd_metadata(self): | |||
# Print nonexistant keys as having empty value | |||
if value is None: | |||
value = "" | |||
printf("%s=%s\n", | |||
nilmdb.utils.str.encode(key), | |||
nilmdb.utils.str.encode(value)) | |||
printf("%s=%s\n", key, value) |
@@ -1,5 +1,7 @@ | |||
# -*- coding: utf-8 -*- | |||
raise Exception("todo: fix path bytes issues") | |||
"""Check database consistency, with some ability to fix problems. | |||
This should be able to fix cases where a database gets corrupted due | |||
to unexpected system shutdown, and detect other cases that may cause | |||
@@ -441,6 +441,8 @@ class Table(object): | |||
are non-monotonic, or don't fall between 'start' and 'end', | |||
a ValueError is raised. | |||
Note that data is always of 'bytes' type. | |||
If 'binary' is True, the data should be in raw binary format | |||
instead: little-endian, matching the current table's layout, | |||
including the int64 timestamp. | |||
@@ -481,9 +483,9 @@ class Table(object): | |||
if binary: | |||
raise IndexError | |||
bad = data.splitlines()[linenum-1] | |||
bad += '\n' + ' ' * (colnum - 1) + '^' | |||
bad += b'\n' + b' ' * (colnum - 1) + b'^' | |||
except IndexError: | |||
bad = "" | |||
bad = b"" | |||
if errtype == rocket.ERR_NON_MONOTONIC: | |||
err = "timestamp is not monotonically increasing" | |||
elif errtype == rocket.ERR_OUT_OF_INTERVAL: | |||
@@ -497,8 +499,9 @@ class Table(object): | |||
timestamp_to_string(end)) | |||
else: | |||
err = str(obj) | |||
bad_str = bad.decode('utf-8', errors='backslashreplace') | |||
raise ValueError("error parsing input data: " + | |||
where + err + "\n" + bad) | |||
where + err + "\n" + bad_str) | |||
tot_rows += added_rows | |||
except Exception: | |||
# Some failure, so try to roll things back by truncating or | |||
@@ -538,10 +541,7 @@ class Table(object): | |||
ret.append(f.extract_string(offset, count)) | |||
remaining -= count | |||
row += count | |||
if binary: | |||
return b"".join(ret) | |||
else: | |||
return "".join(ret) | |||
return b"".join(ret) | |||
def __getitem__(self, row): | |||
"""Extract timestamps from a row, with table[n] notation.""" | |||
@@ -12,7 +12,7 @@ Manages both the SQL database and the table storage backend. | |||
import nilmdb.utils | |||
from nilmdb.utils.printf import * | |||
from nilmdb.utils.time import timestamp_to_string | |||
from nilmdb.utils.time import timestamp_to_bytes | |||
from nilmdb.utils.interval import IntervalError | |||
from nilmdb.server.interval import Interval, DBInterval, IntervalSet | |||
@@ -617,8 +617,8 @@ class NilmDB(object): | |||
# Add markup | |||
if markup: | |||
result.append("# interval-start " + | |||
timestamp_to_string(interval.start) + "\n") | |||
result.append(b"# interval-start " + | |||
timestamp_to_bytes(interval.start) + b"\n") | |||
# Gather these results up | |||
result.append(table.get_data(row_start, row_end, binary)) | |||
@@ -629,16 +629,17 @@ class NilmDB(object): | |||
# Add markup, and exit if restart is set. | |||
if restart is not None: | |||
if markup: | |||
result.append("# interval-end " + | |||
timestamp_to_string(restart) + "\n") | |||
result.append(b"# interval-end " + | |||
timestamp_to_bytes(restart) + b"\n") | |||
break | |||
if markup: | |||
result.append("# interval-end " + | |||
timestamp_to_string(interval.end) + "\n") | |||
result.append(b"# interval-end " + | |||
timestamp_to_bytes(interval.end) + b"\n") | |||
if count: | |||
return matched | |||
return (b"".join(result), restart) | |||
full_result = b"".join(result) | |||
return (full_result, restart) | |||
def stream_remove(self, path, start = None, end = None): | |||
""" | |||
@@ -279,8 +279,9 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args) | |||
union64_t t64; | |||
int i; | |||
/* Input data is Unicode */ | |||
if (!PyArg_ParseTuple(args, "isiiLLL:append_string", &count, | |||
/* Input data is bytes. Using 'y#' instead of 'y' might be | |||
preferable, but strto* requires the null terminator. */ | |||
if (!PyArg_ParseTuple(args, "iyiiLLL:append_string", &count, | |||
&data, &offset, &linenum, | |||
&ll1, &ll2, &ll3)) | |||
return NULL; | |||
@@ -490,7 +491,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args) | |||
} | |||
/**** | |||
* Extract to a Unicode string | |||
* Extract to binary bytes object containing ASCII text-formatted data | |||
*/ | |||
static PyObject *Rocket_extract_string(Rocket *self, PyObject *args) | |||
@@ -588,7 +589,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args) | |||
str[len++] = '\n'; | |||
} | |||
PyObject *pystr = PyUnicode_FromStringAndSize(str, len); | |||
PyObject *pystr = PyBytes_FromStringAndSize(str, len); | |||
free(str); | |||
return pystr; | |||
err: | |||
@@ -245,6 +245,9 @@ class Stream(NilmApp): | |||
"application/octet-stream for " | |||
"binary data, not " + content_type) | |||
# Note that non-binary data is *not* decoded from bytes to string, | |||
# but rather passed directly to stream_insert. | |||
# Check path and get layout | |||
if len(self.db.stream_list(path = path)) != 1: | |||
raise cherrypy.HTTPError("404", "No such stream: " + path) | |||
@@ -453,11 +456,6 @@ class Server(object): | |||
app_config.update({ 'tools.json_in.force': False, | |||
'tools.json_in.processor': json_to_request_params }) | |||
# Convert Unicode strings to raw bytes on output | |||
app_config.update({ 'tools.encode.text_only': True, | |||
'tools.encode.on': True, | |||
'tools.encode.encoding': 'utf-8' }) | |||
# Send tracebacks in error responses. They're hidden by the | |||
# error_page function for client errors (code 400-499). | |||
app_config.update({ 'request.show_tracebacks' : True }) | |||
@@ -26,6 +26,11 @@ def timestamp_to_string(timestamp): | |||
else: | |||
return str(timestamp) | |||
def timestamp_to_bytes(timestamp): | |||
"""Convert a timestamp (integer microseconds since epoch) to a Python | |||
bytes object""" | |||
return timestamp_to_string(timestamp).encode('utf-8') | |||
def timestamp_to_human(timestamp): | |||
"""Convert a timestamp (integer microseconds since epoch) to a | |||
human-readable string, using the local timezone for display | |||
@@ -10,7 +10,7 @@ class Timestamper(object): | |||
ts_iter: iterator that returns a timestamp string for | |||
each line of the file""" | |||
if isinstance(infile, str): | |||
self.file = open(infile, "r") | |||
self.file = open(infile, "rb") | |||
else: | |||
self.file = infile | |||
self.ts_iter = ts_iter | |||
@@ -22,17 +22,17 @@ class Timestamper(object): | |||
while True: | |||
line = self.file.readline(*args) | |||
if not line: | |||
return "" | |||
if line[0] == '#': | |||
return b"" | |||
if line[0:1] == b'#': | |||
continue | |||
break | |||
try: | |||
return next(self.ts_iter) + line | |||
except StopIteration: | |||
return "" | |||
return b"" | |||
def readlines(self, size = None): | |||
out = "" | |||
out = b"" | |||
while True: | |||
line = self.readline() | |||
out += line | |||
@@ -61,7 +61,7 @@ class TimestamperRate(Timestamper): | |||
end: If specified, raise StopIteration before outputting a value | |||
greater than this.""" | |||
timestamp_to_string = nilmdb.utils.time.timestamp_to_string | |||
timestamp_to_bytes = nilmdb.utils.time.timestamp_to_bytes | |||
rate_to_period = nilmdb.utils.time.rate_to_period | |||
def iterator(start, rate, end): | |||
n = 0 | |||
@@ -70,7 +70,7 @@ class TimestamperRate(Timestamper): | |||
now = start + rate_to_period(rate, n) | |||
if end and now >= end: | |||
return | |||
yield timestamp_to_string(now) + " " | |||
yield timestamp_to_bytes(now) + b" " | |||
n += 1 | |||
Timestamper.__init__(self, infile, iterator(start, rate, end)) | |||
self.start = start | |||
@@ -83,11 +83,11 @@ class TimestamperRate(Timestamper): | |||
class TimestamperNow(Timestamper): | |||
"""Timestamper that uses current time""" | |||
def __init__(self, infile): | |||
timestamp_to_string = nilmdb.utils.time.timestamp_to_string | |||
timestamp_to_bytes = nilmdb.utils.time.timestamp_to_bytes | |||
get_now = nilmdb.utils.time.now | |||
def iterator(): | |||
while True: | |||
yield timestamp_to_string(get_now()) + " " | |||
yield timestamp_to_bytes(get_now()) + b" " | |||
Timestamper.__init__(self, infile, iterator()) | |||
def __str__(self): | |||
return "TimestamperNow(...)" |
@@ -68,10 +68,10 @@ class TestBulkData(object): | |||
x = node[0] # timestamp | |||
raw = [] | |||
for i in range(1000): | |||
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) | |||
node.append_data("".join(raw[0:1]), 0, 50000) | |||
node.append_data("".join(raw[1:100]), 0, 50000) | |||
node.append_data("".join(raw[100:]), 0, 50000) | |||
raw.append(b"%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) | |||
node.append_data(b"".join(raw[0:1]), 0, 50000) | |||
node.append_data(b"".join(raw[1:100]), 0, 50000) | |||
node.append_data(b"".join(raw[100:]), 0, 50000) | |||
misc_slices = [ 0, 100, slice(None), slice(0), slice(10), | |||
slice(5,10), slice(3,None), slice(3,-3), | |||
@@ -85,8 +85,8 @@ class TestBulkData(object): | |||
# Extract misc slices while appending, to make sure the | |||
# data isn't being added in the middle of the file | |||
for s in [2, slice(1,5), 2, slice(1,5)]: | |||
node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000) | |||
raw.append("0 0 0 0 0 0 0 0 0\n") | |||
node.append_data(b"0 0 0 0 0 0 0 0 0\n", 0, 50000) | |||
raw.append(b"0 0 0 0 0 0 0 0 0\n") | |||
eq_(get_node_slice(s), raw[s]) | |||
# Get some coverage of remove; remove is more fully tested | |||
@@ -98,7 +98,7 @@ class TestClient(object): | |||
# Bad method types | |||
with assert_raises(ClientError): | |||
client.http.put("/stream/list","") | |||
client.http.put("/stream/list",b"") | |||
# Try a bunch of times to make sure the request body is getting consumed | |||
for x in range(10): | |||
with assert_raises(ClientError): | |||
@@ -215,21 +215,21 @@ class TestClient(object): | |||
eq_(result, None) | |||
# It's OK to insert an empty interval | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": 1, "end": 2 }) | |||
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]]) | |||
client.stream_remove("/newton/prep") | |||
eq_(list(client.stream_intervals("/newton/prep")), []) | |||
# Timestamps can be negative too | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": -2, "end": -1 }) | |||
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]]) | |||
client.stream_remove("/newton/prep") | |||
eq_(list(client.stream_intervals("/newton/prep")), []) | |||
# Intervals that end at zero shouldn't be any different | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": -1, "end": 0 }) | |||
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]]) | |||
client.stream_remove("/newton/prep") | |||
@@ -237,27 +237,27 @@ class TestClient(object): | |||
# Try forcing a server request with equal start and end | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": 0, "end": 0 }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("start must precede end", str(e.exception)) | |||
# Invalid times in HTTP request | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": "asdf", "end": 0 }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("invalid start", str(e.exception)) | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
client.http.put("stream/insert", b"", { "path": "/newton/prep", | |||
"start": 0, "end": "asdf" }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("invalid end", str(e.exception)) | |||
# Good content type | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", | |||
client.http.put("stream/insert", b"", | |||
{ "path": "xxxx", "start": 0, "end": 1, | |||
"binary": 1 }, | |||
binary = True) | |||
@@ -265,7 +265,7 @@ class TestClient(object): | |||
# Bad content type | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", | |||
client.http.put("stream/insert", b"", | |||
{ "path": "xxxx", "start": 0, "end": 1, | |||
"binary": 1 }, | |||
binary = False) | |||
@@ -513,68 +513,68 @@ class TestClient(object): | |||
# override _max_data to trigger frequent server updates | |||
ctx._max_data = 15 | |||
ctx.insert("1000 1\n") | |||
ctx.insert(b"1000 1\n") | |||
ctx.insert("1010 ") | |||
ctx.insert("1\n1020 1") | |||
ctx.insert("") | |||
ctx.insert("\n1030 1\n") | |||
ctx.insert(b"1010 ") | |||
ctx.insert(b"1\n1020 1") | |||
ctx.insert(b"") | |||
ctx.insert(b"\n1030 1\n") | |||
ctx.insert("1040 1\n") | |||
ctx.insert("# hello\n") | |||
ctx.insert(" # hello\n") | |||
ctx.insert(" 1050 1\n") | |||
ctx.insert(b"1040 1\n") | |||
ctx.insert(b"# hello\n") | |||
ctx.insert(b" # hello\n") | |||
ctx.insert(b" 1050 1\n") | |||
ctx.finalize() | |||
ctx.insert("1070 1\n") | |||
ctx.insert(b"1070 1\n") | |||
ctx.update_end(1080) | |||
ctx.finalize() | |||
ctx.update_start(1090) | |||
ctx.insert("1100 1\n") | |||
ctx.insert("1110 1\n") | |||
ctx.insert(b"1100 1\n") | |||
ctx.insert(b"1110 1\n") | |||
ctx.send() | |||
ctx.insert("1120 1\n") | |||
ctx.insert("1130 1\n") | |||
ctx.insert("1140 1\n") | |||
ctx.insert(b"1120 1\n") | |||
ctx.insert(b"1130 1\n") | |||
ctx.insert(b"1140 1\n") | |||
ctx.update_end(1160) | |||
ctx.insert("1150 1\n") | |||
ctx.insert(b"1150 1\n") | |||
ctx.update_end(1170) | |||
ctx.insert("1160 1\n") | |||
ctx.insert(b"1160 1\n") | |||
ctx.update_end(1180) | |||
ctx.insert("1170 1" + | |||
" # this is super long" * 100 + | |||
"\n") | |||
ctx.insert(b"1170 1" + | |||
b" # this is super long" * 100 + | |||
b"\n") | |||
ctx.finalize() | |||
ctx.insert("# this is super long" * 100) | |||
ctx.insert(b"# this is super long" * 100) | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/context/test", | |||
1000, 2000) as ctx: | |||
ctx.insert("1180 1\n") | |||
ctx.insert(b"1180 1\n") | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/context/test", | |||
2000, 3000) as ctx: | |||
ctx.insert("1180 1\n") | |||
ctx.insert(b"1180 1\n") | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/context/test") as ctx: | |||
ctx.insert("bogus data\n") | |||
ctx.insert(b"bogus data\n") | |||
with client.stream_insert_context("/context/test", 2000, 3000) as ctx: | |||
# make sure our override wasn't permanent | |||
ne_(ctx._max_data, 15) | |||
ctx.insert("2250 1\n") | |||
ctx.insert(b"2250 1\n") | |||
ctx.finalize() | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/context/test", | |||
3000, 4000) as ctx: | |||
ctx.insert("3010 1\n") | |||
ctx.insert("3020 2\n") | |||
ctx.insert("3030 3\n") | |||
ctx.insert("3040 4\n") | |||
ctx.insert("3040 4\n") # non-monotonic after a few lines | |||
ctx.insert(b"3010 1\n") | |||
ctx.insert(b"3020 2\n") | |||
ctx.insert(b"3030 3\n") | |||
ctx.insert(b"3040 4\n") | |||
ctx.insert(b"3040 4\n") # non-monotonic after a few lines | |||
ctx.finalize() | |||
eq_(list(client.stream_intervals("/context/test")), | |||
@@ -609,9 +609,9 @@ class TestClient(object): | |||
# Insert a region with just a few points | |||
with client.stream_insert_context("/empty/test") as ctx: | |||
ctx.update_start(100) | |||
ctx.insert("140 1\n") | |||
ctx.insert("150 1\n") | |||
ctx.insert("160 1\n") | |||
ctx.insert(b"140 1\n") | |||
ctx.insert(b"150 1\n") | |||
ctx.insert(b"160 1\n") | |||
ctx.update_end(200) | |||
ctx.finalize() | |||
@@ -624,7 +624,7 @@ class TestClient(object): | |||
# Try also creating a completely empty interval from scratch, | |||
# in a few different ways. | |||
client.stream_insert("/empty/test", "", 300, 350) | |||
client.stream_insert("/empty/test", b"", 300, 350) | |||
client.stream_insert("/empty/test", [], 400, 450) | |||
with client.stream_insert_context("/empty/test", 500, 550): | |||
pass | |||
@@ -649,10 +649,10 @@ class TestClient(object): | |||
ctx.finalize() # inserts [1000, 1050] | |||
ctx.finalize() # nothing | |||
ctx.finalize() # nothing | |||
ctx.insert("1100 1\n") | |||
ctx.insert(b"1100 1\n") | |||
ctx.finalize() # inserts [1100, 1101] | |||
ctx.update_start(1199) | |||
ctx.insert("1200 1\n") | |||
ctx.insert(b"1200 1\n") | |||
ctx.update_end(1250) | |||
ctx.finalize() # inserts [1199, 1250] | |||
ctx.update_start(1299) | |||
@@ -660,14 +660,14 @@ class TestClient(object): | |||
ctx.update_end(1350) | |||
ctx.finalize() # nothing | |||
ctx.update_start(1400) | |||
ctx.insert("# nothing!\n") | |||
ctx.insert(b"# nothing!\n") | |||
ctx.update_end(1450) | |||
ctx.finalize() | |||
ctx.update_start(1500) | |||
ctx.insert("# nothing!") | |||
ctx.insert(b"# nothing!") | |||
ctx.update_end(1550) | |||
ctx.finalize() | |||
ctx.insert("# nothing!\n" * 10) | |||
ctx.insert(b"# nothing!\n" * 10) | |||
ctx.finalize() | |||
# implicit last finalize inserts [1400, 1450] | |||
@@ -709,19 +709,19 @@ class TestClient(object): | |||
client.stream_create("/rounding/test", "uint16_1") | |||
with client.stream_insert_context("/rounding/test", | |||
100000000, 200000000.1) as ctx: | |||
ctx.insert("100000000.1 1\n") | |||
ctx.insert("150000000.00003 1\n") | |||
ctx.insert("199999999.4 1\n") | |||
ctx.insert(b"100000000.1 1\n") | |||
ctx.insert(b"150000000.00003 1\n") | |||
ctx.insert(b"199999999.4 1\n") | |||
eq_(list(client.stream_intervals("/rounding/test")), | |||
[ [ 100000000, 200000000 ] ]) | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/rounding/test", | |||
200000000, 300000000) as ctx: | |||
ctx.insert("200000000 1\n") | |||
ctx.insert("250000000 1\n") | |||
ctx.insert(b"200000000 1\n") | |||
ctx.insert(b"250000000 1\n") | |||
# Server will round this and give an error on finalize() | |||
ctx.insert("299999999.99 1\n") | |||
ctx.insert(b"299999999.99 1\n") | |||
client.stream_remove("/rounding/test") | |||
client.stream_destroy("/rounding/test") | |||
@@ -71,9 +71,9 @@ class TestCmdline(object): | |||
( sys.stdin, sys.stdout, sys.stderr ) = self.saved | |||
# Empty input if none provided | |||
if infile is None: | |||
infile = io.StringIO("") | |||
infile = io.TextIOWrapper(io.BytesIO(b"")) | |||
# Capture stderr | |||
errfile = io.StringIO() | |||
errfile = io.TextIOWrapper(io.BytesIO()) | |||
if outfile is None: | |||
# If no output file, capture stdout with stderr | |||
outfile = errfile | |||
@@ -84,8 +84,16 @@ class TestCmdline(object): | |||
sys.exit(0) | |||
except SystemExit as e: | |||
exitcode = e.code | |||
captured = outfile.getvalue() | |||
self.captured = captured | |||
# Capture raw binary output, and also try to decode a Unicode | |||
# string copy. | |||
self.captured_binary = outfile.buffer.getvalue() | |||
try: | |||
outfile.seek(0) | |||
self.captured = outfile.read() | |||
except UnicodeDecodeError: | |||
self.captured = None | |||
self.exitcode = exitcode | |||
def ok(self, arg_string, infile = None): | |||
@@ -629,7 +637,7 @@ class TestCmdline(object): | |||
self.fail("extract -m -B /newton/prep -s min -e max") | |||
self.contain("binary cannot be combined") | |||
self.ok("extract -B /newton/prep -s min -e max") | |||
eq_(len(self.captured), 43200 * (8 + 8*4)) | |||
eq_(len(self.captured_binary), 43200 * (8 + 8*4)) | |||
# markup for 3 intervals, plus extra markup lines whenever we had | |||
# a "restart" from the nilmdb.stream_extract function | |||
@@ -105,7 +105,8 @@ class TestNumpyClient(object): | |||
# Compare. Will be close but not exact because the conversion | |||
# to and from ASCII was lossy. | |||
data = timestamper.TimestamperRate(testfile, start, rate) | |||
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9) | |||
data_str = b" ".join(data).decode('utf-8', errors='backslashreplace') | |||
actual = np.fromstring(data_str, sep=' ').reshape(14400, 9) | |||
assert(np.allclose(array, actual)) | |||
client.close() | |||
@@ -293,7 +294,7 @@ class TestNumpyClient(object): | |||
# Try also creating a completely empty interval from scratch, | |||
# in a few different ways. | |||
client.stream_insert("/empty/test", "", 300, 350) | |||
client.stream_insert("/empty/test", b"", 300, 350) | |||
client.stream_insert("/empty/test", [], 400, 450) | |||
with client.stream_insert_numpy_context("/empty/test", 500, 550): | |||
pass | |||
@@ -18,62 +18,62 @@ class TestTimestamper(object): | |||
def test_timestamper(self): | |||
def join(list): | |||
return "\n".join(list) + "\n" | |||
return b"\n".join(list) + b"\n" | |||
datetime_tz.localtz_set("America/New_York") | |||
start = nilmdb.utils.time.parse_time("03/24/2012") | |||
lines_in = [ "hello", "world", "hello world", "# commented out" ] | |||
lines_out = [ "1332561600000000 hello", | |||
"1332561600000125 world", | |||
"1332561600000250 hello world" ] | |||
lines_in = [ b"hello", b"world", b"hello world", b"# commented out" ] | |||
lines_out = [ b"1332561600000000 hello", | |||
b"1332561600000125 world", | |||
b"1332561600000250 hello world" ] | |||
# full | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000) | |||
foo = ts.readlines() | |||
eq_(foo, join(lines_out)) | |||
in_("TimestamperRate(..., start=", str(ts)) | |||
# first 30 or so bytes means the first 2 lines | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000) | |||
foo = ts.readlines(30) | |||
eq_(foo, join(lines_out[0:2])) | |||
# stop iteration early | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000, | |||
1332561600000200) | |||
foo = "" | |||
foo = b"" | |||
for line in ts: | |||
foo += line | |||
eq_(foo, join(lines_out[0:2])) | |||
# stop iteration early (readlines) | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000, | |||
1332561600000200) | |||
foo = ts.readlines() | |||
eq_(foo, join(lines_out[0:2])) | |||
# stop iteration really early | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000, | |||
1332561600000000) | |||
foo = ts.readlines() | |||
eq_(foo, "") | |||
eq_(foo, b"") | |||
# use iterator | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperRate(input, start, 8000) | |||
foo = "" | |||
foo = b"" | |||
for line in ts: | |||
foo += line | |||
eq_(foo, join(lines_out)) | |||
# check that TimestamperNow gives similar result | |||
input = io.StringIO(join(lines_in)) | |||
input = io.BytesIO(join(lines_in)) | |||
ts = timestamper.TimestamperNow(input) | |||
foo = ts.readlines() | |||
ne_(foo, join(lines_out)) | |||